Skip to content

iceoryx_posh/internal/popo/building_blocks/chunk_distributor.hpp🔗

Namespaces🔗

Name
iox
iox::popo

Classes🔗

Name
class iox::popo::ChunkDistributor
The ChunkDistributor is the low layer building block to send SharedChunks to a dynamic number of ChunkQueus. Together with the ChunkQueuePusher, the ChunkDistributor builds the infrastructure to exchange memory chunks between different data producers and consumers that could be located in different processes. Besides a modifiable container of ChunkQueues to which a SharedChunk can be deliverd, it holds a configurable history of last sent chunks. This allows to provide a newly added queue a number of last chunks to start from. This is needed for functionality known as latched topic in ROS or field in ara::com. A ChunkDistributor is used to build elements of higher abstraction layers that also do memory managemet and provide an API towards the real user.

Source code🔗

// Copyright (c) 2020 by Robert Bosch GmbH. All rights reserved.
// Copyright (c) 2021 by Apex.AI Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// SPDX-License-Identifier: Apache-2.0
#ifndef IOX_POSH_POPO_BUILDING_BLOCKS_CHUNK_DISTRIBUTOR_HPP
#define IOX_POSH_POPO_BUILDING_BLOCKS_CHUNK_DISTRIBUTOR_HPP

#include "iceoryx_posh/internal/mepoo/shared_chunk.hpp"
#include "iceoryx_posh/internal/popo/building_blocks/chunk_distributor_data.hpp"
#include "iceoryx_posh/internal/popo/building_blocks/chunk_queue_pusher.hpp"
#include "iceoryx_utils/cxx/helplets.hpp"

#include <thread>

namespace iox
{
namespace popo
{
enum class ChunkDistributorError
{
    INVALID_STATE,
    QUEUE_CONTAINER_OVERFLOW,
    QUEUE_NOT_IN_CONTAINER
};

template <typename ChunkDistributorDataType>
class ChunkDistributor
{
  public:
    using MemberType_t = ChunkDistributorDataType;
    using ChunkQueueData_t = typename ChunkDistributorDataType::ChunkQueueData_t;
    using ChunkQueuePusher_t = typename ChunkDistributorDataType::ChunkQueuePusher_t;

    explicit ChunkDistributor(cxx::not_null<MemberType_t* const> chunkDistrubutorDataPtr) noexcept;

    ChunkDistributor(const ChunkDistributor& other) = delete;
    ChunkDistributor& operator=(const ChunkDistributor&) = delete;
    ChunkDistributor(ChunkDistributor&& rhs) = default;
    ChunkDistributor& operator=(ChunkDistributor&& rhs) = default;
    virtual ~ChunkDistributor() = default;

    cxx::expected<ChunkDistributorError> tryAddQueue(cxx::not_null<ChunkQueueData_t* const> queueToAdd,
                                                     const uint64_t requestedHistory = 0u) noexcept;

    cxx::expected<ChunkDistributorError> tryRemoveQueue(cxx::not_null<ChunkQueueData_t* const> queueToRemove) noexcept;

    void removeAllQueues() noexcept;

    bool hasStoredQueues() const noexcept;

    void deliverToAllStoredQueues(mepoo::SharedChunk chunk) noexcept;

    bool deliverToQueue(cxx::not_null<ChunkQueueData_t* const> queue, mepoo::SharedChunk chunk) noexcept;

    void addToHistoryWithoutDelivery(mepoo::SharedChunk chunk) noexcept;

    uint64_t getHistorySize() noexcept;

    uint64_t getHistoryCapacity() const noexcept;

    void clearHistory() noexcept;

    void cleanup() noexcept;

  protected:
    const MemberType_t* getMembers() const noexcept;
    MemberType_t* getMembers() noexcept;

  private:
    MemberType_t* m_chunkDistrubutorDataPtr{nullptr};
};

} // namespace popo
} // namespace iox

#include "iceoryx_posh/internal/popo/building_blocks/chunk_distributor.inl"

#endif // IOX_POSH_POPO_BUILDING_BLOCKS_CHUNK_DISTRIBUTOR_HPP

Updated on 17 June 2021 at 11:15:27 CEST