iox::popo::ChunkDistributor🔗
The ChunkDistributor is the low layer building block to send SharedChunks to a dynamic number of ChunkQueus. Together with the ChunkQueuePusher, the ChunkDistributor builds the infrastructure to exchange memory chunks between different data producers and consumers that could be located in different processes. Besides a modifiable container of ChunkQueues to which a SharedChunk can be deliverd, it holds a configurable history of last sent chunks. This allows to provide a newly added queue a number of last chunks to start from. This is needed for functionality known as latched topic in ROS or field in ara::com. A ChunkDistributor is used to build elements of higher abstraction layers that also do memory managemet and provide an API towards the real user. More...
#include <chunk_distributor.hpp>
Public Types🔗
Name | |
---|---|
using ChunkDistributorDataType | MemberType_t |
using typename ChunkDistributorDataType::ChunkQueueData_t | ChunkQueueData_t |
using typename ChunkDistributorDataType::ChunkQueuePusher_t | ChunkQueuePusher_t |
Public Functions🔗
Name | |
---|---|
ChunkDistributor(cxx::not_null< MemberType_t *const > chunkDistrubutorDataPtr) | |
ChunkDistributor(const ChunkDistributor & other) | |
ChunkDistributor & | operator=(const ChunkDistributor & ) |
ChunkDistributor(ChunkDistributor && rhs) =default | |
ChunkDistributor & | operator=(ChunkDistributor && rhs) =default |
virtual | ~ChunkDistributor() =default |
cxx::expected< ChunkDistributorError > | tryAddQueue(cxx::not_null< ChunkQueueData_t *const > queueToAdd, const uint64_t requestedHistory =0u) Add a queue to the internal list of chunk queues to which chunks are delivered when calling deliverToAllStoredQueues. |
cxx::expected< ChunkDistributorError > | tryRemoveQueue(cxx::not_null< ChunkQueueData_t *const > queueToRemove) Remove a queue from the internal list of chunk queues. |
void | removeAllQueues() Delete all the stored chunk queues. |
bool | hasStoredQueues() const Get the information whether there are any stored chunk queues. |
void | deliverToAllStoredQueues(mepoo::SharedChunk chunk) Deliver the provided shared chunk to all the stored chunk queues. The chunk will be added to the chunk history. |
bool | deliverToQueue(cxx::not_null< ChunkQueueData_t *const > queue, mepoo::SharedChunk chunk) Deliver the provided shared chunk to the provided chunk queue. The chunk will NOT be added to the chunk history. |
void | addToHistoryWithoutDelivery(mepoo::SharedChunk chunk) Update the chunk history but do not deliver the chunk to any chunk queue. E.g. use case is to to update a non offered field in ara. |
uint64_t | getHistorySize() Get the current size of the chunk history. |
uint64_t | getHistoryCapacity() const Get the capacity of the chunk history. |
void | clearHistory() Clears the chunk history. |
void | cleanup() cleanup the used shrared memory chunks |
Protected Functions🔗
Name | |
---|---|
const MemberType_t * | getMembers() const |
MemberType_t * | getMembers() |
Detailed Description🔗
template <typename ChunkDistributorDataType >
class iox::popo::ChunkDistributor;
The ChunkDistributor is the low layer building block to send SharedChunks to a dynamic number of ChunkQueus. Together with the ChunkQueuePusher, the ChunkDistributor builds the infrastructure to exchange memory chunks between different data producers and consumers that could be located in different processes. Besides a modifiable container of ChunkQueues to which a SharedChunk can be deliverd, it holds a configurable history of last sent chunks. This allows to provide a newly added queue a number of last chunks to start from. This is needed for functionality known as latched topic in ROS or field in ara::com. A ChunkDistributor is used to build elements of higher abstraction layers that also do memory managemet and provide an API towards the real user.
Todo: There are currently some challenge: For the stored queues and the history, containers are used which are not thread safe. Therefore we use an inter-process mutex. But this can lead to deadlocks if a user process gets terminated while one of its threads is in the ChunkDistributor and holds a lock. An easier setup would be if changing the queues by a middleware thread and sending chunks by the user process would not interleave. I.e. there is no concurrent access to the containers. Then a memory synchronization would be sufficient. The cleanup() call is the biggest challenge. This is used to free chunks that are still held by a not properly terminated user application. Even if access from middleware and user threads do not overlap, the history container to cleanup could be in an inconsistent state as the application was hard terminated while changing it. We would need a container like the UsedChunkList to have one that is robust against such inconsistencies.... A perfect job for our future selves
About Concurrency: This ChunkDistributor can be used with different LockingPolicies for different scenarios When different threads operate on it (e.g. application sends chunks and RouDi adds and removes queues), a locking policy must be used that ensures consistent data in the ChunkDistributorData.
Public Types Documentation🔗
using MemberType_t🔗
using iox::popo::ChunkDistributor< ChunkDistributorDataType >::MemberType_t = ChunkDistributorDataType;
using ChunkQueueData_t🔗
using iox::popo::ChunkDistributor< ChunkDistributorDataType >::ChunkQueueData_t = typename ChunkDistributorDataType::ChunkQueueData_t;
using ChunkQueuePusher_t🔗
using iox::popo::ChunkDistributor< ChunkDistributorDataType >::ChunkQueuePusher_t = typename ChunkDistributorDataType::ChunkQueuePusher_t;
Public Functions Documentation🔗
function ChunkDistributor🔗
inline explicit ChunkDistributor(
cxx::not_null< MemberType_t *const > chunkDistrubutorDataPtr
)
function ChunkDistributor🔗
ChunkDistributor(
const ChunkDistributor & other
)
function operator=🔗
ChunkDistributor & operator=(
const ChunkDistributor &
)
function ChunkDistributor🔗
ChunkDistributor(
ChunkDistributor && rhs
) =default
function operator=🔗
ChunkDistributor & operator=(
ChunkDistributor && rhs
) =default
function ~ChunkDistributor🔗
virtual ~ChunkDistributor() =default
function tryAddQueue🔗
inline cxx::expected< ChunkDistributorError > tryAddQueue(
cxx::not_null< ChunkQueueData_t *const > queueToAdd,
const uint64_t requestedHistory =0u
)
Add a queue to the internal list of chunk queues to which chunks are delivered when calling deliverToAllStoredQueues.
Parameters:
- queueToAdd chunk queue to add to the list
- requestedHistory number of last chunks from history to send if available. If history size is smaller then the available history size chunks are provided
Return: if the queue could be added it returns success, otherwiese a ChunkDistributor error
function tryRemoveQueue🔗
inline cxx::expected< ChunkDistributorError > tryRemoveQueue(
cxx::not_null< ChunkQueueData_t *const > queueToRemove
)
Remove a queue from the internal list of chunk queues.
Parameters:
- chunk queue to remove from the list
Return: if the queue could be removed it returns success, otherwiese a ChunkDistributor error
function removeAllQueues🔗
inline void removeAllQueues()
Delete all the stored chunk queues.
function hasStoredQueues🔗
inline bool hasStoredQueues() const
Get the information whether there are any stored chunk queues.
Return: true if there are stored chunk queues, false if not
function deliverToAllStoredQueues🔗
inline void deliverToAllStoredQueues(
mepoo::SharedChunk chunk
)
Deliver the provided shared chunk to all the stored chunk queues. The chunk will be added to the chunk history.
Parameters:
- shared chunk to be delivered
function deliverToQueue🔗
inline bool deliverToQueue(
cxx::not_null< ChunkQueueData_t *const > queue,
mepoo::SharedChunk chunk
)
Deliver the provided shared chunk to the provided chunk queue. The chunk will NOT be added to the chunk history.
Parameters:
- chunk queue to which this chunk shall be delivered
- shared chunk to be delivered
Return: false if a queue overflow occured, otherwise true
function addToHistoryWithoutDelivery🔗
inline void addToHistoryWithoutDelivery(
mepoo::SharedChunk chunk
)
Update the chunk history but do not deliver the chunk to any chunk queue. E.g. use case is to to update a non offered field in ara.
Parameters:
- shared chunk add to the chunk history
function getHistorySize🔗
inline uint64_t getHistorySize()
Get the current size of the chunk history.
Return: chunk history size
function getHistoryCapacity🔗
inline uint64_t getHistoryCapacity() const
Get the capacity of the chunk history.
Return: chunk history capacity
function clearHistory🔗
inline void clearHistory()
Clears the chunk history.
function cleanup🔗
inline void cleanup()
cleanup the used shrared memory chunks
Todocurrently we have a deadlock / mutex destroy vulnerability if the ThreadSafePolicy is used and a sending application dies when having the lock for sending. If the RouDi daemon wants to cleanup or does discovery changes we have a deadlock or an exception when destroying the mutex As long as we don't have a multi-threaded lock-free ChunkDistributor or another concept we die here
Protected Functions Documentation🔗
function getMembers🔗
inline const MemberType_t * getMembers() const
function getMembers🔗
MemberType_t * getMembers()
Updated on 31 May 2022 at 15:29:16 CEST