Skip to content

iceoryx_utils/internal/concurrent/lockfree_queue/resizeable_lockfree_queue.inl🔗

Namespaces🔗

Name
iox
building block to easily create free function for logging in a library context
iox::concurrent

Source code🔗

// Copyright (c) 2020 by Apex.AI Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// SPDX-License-Identifier: Apache-2.0

#include "iceoryx_utils/concurrent/resizeable_lockfree_queue.hpp"

namespace iox
{
namespace concurrent
{
template <typename ElementType, uint64_t MaxCapacity>
ResizeableLockFreeQueue<ElementType, MaxCapacity>::ResizeableLockFreeQueue(const uint64_t initialCapacity) noexcept
{
    setCapacity(initialCapacity);
}

template <typename ElementType, uint64_t MaxCapacity>
constexpr uint64_t ResizeableLockFreeQueue<ElementType, MaxCapacity>::maxCapacity() noexcept
{
    return MAX_CAPACITY;
}

template <typename ElementType, uint64_t MaxCapacity>
uint64_t ResizeableLockFreeQueue<ElementType, MaxCapacity>::capacity() const noexcept
{
    return m_capacity.load(std::memory_order_relaxed);
}


template <typename ElementType, uint64_t MaxCapacity>
bool ResizeableLockFreeQueue<ElementType, MaxCapacity>::setCapacity(const uint64_t newCapacity) noexcept
{
    auto removeHandler = [](const ElementType&) {};
    return setCapacity(newCapacity, removeHandler);
}

template <typename ElementType, uint64_t MaxCapacity>
template <typename Function, typename>
bool ResizeableLockFreeQueue<ElementType, MaxCapacity>::setCapacity(const uint64_t newCapacity,
                                                                    Function&& removeHandler) noexcept
{
    if (newCapacity > MAX_CAPACITY)
    {
        return false;
    }


    if (m_resizeInProgress.test_and_set(std::memory_order_acquire))
    {
        // at most one resize can be in progress at any time
        return false;
    }

    auto cap = capacity();

    while (cap != newCapacity)
    {
        if (cap < newCapacity)
        {
            auto toIncrease = newCapacity - cap;
            increaseCapacity(toIncrease); // return value does not matter, we check the capacity later
        }
        else
        {
            auto toDecrease = cap - newCapacity;
            decreaseCapacity(toDecrease, removeHandler); // return value does not matter, we check the capacity later
        }

        cap = capacity();
    }

    // sync everything related to capacity change, e.g. the new capacity stored in m_capacity
    m_resizeInProgress.clear(std::memory_order_release);
    return true;
}

template <typename ElementType, uint64_t MaxCapacity>
uint64_t ResizeableLockFreeQueue<ElementType, MaxCapacity>::increaseCapacity(const uint64_t toIncrease) noexcept
{
    // we can be sure this is not called concurrently due to the m_resizeInProgress flag
    //(this must be ensured as the vector is modified)
    uint64_t increased = 0U;
    while (increased < toIncrease)
    {
        if (m_unusedIndices.empty())
        {
            // no indices left to increase capacity
            return increased;
        }
        ++increased;
        m_capacity.fetch_add(1U);
        Base::m_freeIndices.push(m_unusedIndices.back());
        m_unusedIndices.pop_back();
    }

    return increased;
}

template <typename ElementType, uint64_t MaxCapacity>
template <typename Function>
uint64_t ResizeableLockFreeQueue<ElementType, MaxCapacity>::decreaseCapacity(const uint64_t toDecrease,
                                                                             Function&& removeHandler) noexcept
{
    uint64_t decreased = 0U;
    while (decreased < toDecrease)
    {
        BufferIndex index;
        while (decreased < toDecrease)
        {
            if (!Base::m_freeIndices.pop(index))
            {
                break;
            }

            m_unusedIndices.push_back(index);
            ++decreased;
            if (m_capacity.fetch_sub(1U) == 1U)
            {
                // we reached capacity 0 and cannot further decrease it
                return decreased;
            }
        }

        // no free indices, try the used ones
        while (decreased < toDecrease)
        {
            // remark: just calling pop to create free space is not sufficent in a concurrent scenario
            // we want to make sure no one else gets the index once we have it
            if (!tryGetUsedIndex(index))
            {
                // try the free ones again
                break;
            }

            auto result = Base::readBufferAt(index);
            removeHandler(result.value());
            m_unusedIndices.push_back(index);

            ++decreased;
            if (m_capacity.fetch_sub(1U) == 1U)
            {
                // we reached capacity 0 and cannot further decrease it
                return decreased;
            }
        }
    }
    return decreased;
}

template <typename ElementType, uint64_t MaxCapacity>
bool ResizeableLockFreeQueue<ElementType, MaxCapacity>::tryGetUsedIndex(BufferIndex& index) noexcept
{

    return Base::m_usedIndices.popIfSizeIsAtLeast(capacity(), index);
}

template <typename ElementType, uint64_t MaxCapacity>
iox::cxx::optional<ElementType>
ResizeableLockFreeQueue<ElementType, MaxCapacity>::push(const ElementType& value) noexcept
{
    return pushImpl(std::forward<const ElementType>(value));
}

template <typename ElementType, uint64_t MaxCapacity>
iox::cxx::optional<ElementType> ResizeableLockFreeQueue<ElementType, MaxCapacity>::push(ElementType&& value) noexcept
{
    return pushImpl(std::forward<ElementType>(value));
}

template <typename ElementType, uint64_t MaxCapacity>
template <typename T>
iox::cxx::optional<ElementType> ResizeableLockFreeQueue<ElementType, MaxCapacity>::pushImpl(T&& value) noexcept
{
    cxx::optional<ElementType> evictedValue;

    BufferIndex index;

    while (!Base::m_freeIndices.pop(index))
    {
        if (tryGetUsedIndex(index))
        {
            evictedValue = Base::readBufferAt(index);
            break;
        }
        // if m_usedIndices was not full we try again (m_freeIndices should contain an index in this case)
        // note that it is theoretically possible to be unsuccessful indefinitely
        // (and thus we would have an infinite loop)
        // but this requires a timing of concurrent pushes and pops which is exceptionally unlikely in practice
    }

    // if we removed from a full queue via popIfFull it might not be full anymore when a concurrent pop occurs

    Base::writeBufferAt(index, value);

    Base::m_usedIndices.push(index);

    return evictedValue; // value was moved into the queue, if a value was evicted to do so return it
}

} // namespace concurrent
} // namespace iox

Updated on 31 May 2022 at 15:52:34 CEST