Skip to content

iceoryx_hoofs/concurrent/resizeable_lockfree_queue.hpp🔗

Namespaces🔗

Name
iox
building block to easily create free function for logging in a library context
iox::concurrent

Classes🔗

Name
class iox::concurrent::ResizeableLockFreeQueue
implements a lock free queue (i.e. container with FIFO order) of elements of type T with a maximum capacity MaxCapacity. The capacity can be defined to be anything between 0 and MaxCapacity at construction time or later at runtime using setCapacity. This is even possible while concurrent push and pop operations are executed, i.e. the queue does not have to be empty. Only one thread will succeed setting its desired capacity if there are more threads trying to change the capacity at the same time (it is unpredictable which thread).

Source code🔗

// Copyright (c) 2020 - 2021 by Apex.AI Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// SPDX-License-Identifier: Apache-2.0

#ifndef IOX_HOOFS_CONCURRENT_RESIZEABLE_LOCKFREE_QUEUE_HPP
#define IOX_HOOFS_CONCURRENT_RESIZEABLE_LOCKFREE_QUEUE_HPP

#include "iceoryx_hoofs/concurrent/lockfree_queue.hpp"
#include "iceoryx_hoofs/cxx/type_traits.hpp"
#include "iceoryx_hoofs/cxx/vector.hpp"

#include <atomic>

namespace iox
{
namespace concurrent
{

// Remark: We use protected inheritance to make the base class methods inaccessible for the user.
// We cannot use virtual functions since we need to use this class in shared memory.
// Some of the methods need to be rewritten specifically for this class, others simply redirect
// the call to the base class.
//
// Since supporting the resize (setCapacity) functionality has an impact on the runtime even
// if the feature is not used, we provide a queue wihout resize functionality in an additional
// base class that can be used separately.
template <typename ElementType, uint64_t MaxCapacity>
class ResizeableLockFreeQueue : protected LockFreeQueue<ElementType, MaxCapacity>
{
  private:
    using Base = LockFreeQueue<ElementType, MaxCapacity>;

  public:
    using element_t = ElementType;
    static constexpr uint64_t MAX_CAPACITY = MaxCapacity;

    ResizeableLockFreeQueue() noexcept = default;
    ~ResizeableLockFreeQueue() noexcept = default;

    // deleted for now, can be implemented later if needed
    // note: concurrent copying or moving in lockfree fashion is nontrivial
    ResizeableLockFreeQueue(const ResizeableLockFreeQueue&) = delete;
    ResizeableLockFreeQueue(ResizeableLockFreeQueue&&) = delete;
    ResizeableLockFreeQueue& operator=(const ResizeableLockFreeQueue&) = delete;
    ResizeableLockFreeQueue& operator=(ResizeableLockFreeQueue&&) = delete;

    ResizeableLockFreeQueue(const uint64_t initialCapacity) noexcept;

    static constexpr uint64_t maxCapacity() noexcept;

    using Base::empty;
    using Base::pop;
    using Base::size;
    using Base::tryPush;

    uint64_t capacity() const noexcept;

    iox::cxx::optional<ElementType> push(const ElementType& value) noexcept;

    iox::cxx::optional<ElementType> push(ElementType&& value) noexcept;

    // overloads to set the capacity
    // 1) The most general one allows providing a removeHandler to specify remove behavior.
    //    This could e.g. be to store them in a container.
    // 2) The second overload discards removed elements.


    template <typename Function,
              typename = typename std::enable_if<cxx::is_invocable<Function, ElementType>::value>::type>
    bool setCapacity(const uint64_t newCapacity, Function&& removeHandler) noexcept;

    bool setCapacity(const uint64_t newCapacity) noexcept;

  private:
    using BufferIndex = typename Base::BufferIndex;
    std::atomic<uint64_t> m_capacity{MaxCapacity};
    // must be operator= otherwise it is undefined, see https://en.cppreference.com/w/cpp/atomic/ATOMIC_FLAG_INIT
    std::atomic_flag m_resizeInProgress = ATOMIC_FLAG_INIT;
    iox::cxx::vector<BufferIndex, MaxCapacity> m_unusedIndices;

    uint64_t increaseCapacity(const uint64_t toIncrease) noexcept;

    template <typename Function>
    uint64_t decreaseCapacity(const uint64_t toDecrease, Function&& removeHandler) noexcept;

    bool tryGetUsedIndex(BufferIndex& index) noexcept;

    template <typename T>
    iox::cxx::optional<ElementType> pushImpl(T&& value) noexcept;
};

} // namespace concurrent
} // namespace iox

#include "iceoryx_hoofs/internal/concurrent/lockfree_queue/resizeable_lockfree_queue.inl"

#endif // IOX_HOOFS_CONCURRENT_RESIZEABLE_LOCKFREE_QUEUE_HPP

Updated on 2 April 2022 at 16:37:47 CEST