Skip to content

iceoryx_utils/internal/concurrent/lockfree_queue/index_queue.inl🔗

Namespaces🔗

Name
iox
building block to easily create free function for logging in a library context
iox::concurrent

Source code🔗

// Copyright (c) 2019, 2020 by Robert Bosch GmbH, Apex.AI Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// SPDX-License-Identifier: Apache-2.0

namespace iox
{
namespace concurrent
{
template <uint64_t Capacity, typename ValueType>
IndexQueue<Capacity, ValueType>::IndexQueue(ConstructEmpty_t) noexcept
    : m_readPosition(Index(Capacity))
    , m_writePosition(Index(Capacity))
{
    for (uint64_t i = 0U; i < Capacity; ++i)
    {
        m_cells[i].store(Index(0U), std::memory_order_relaxed);
    }
}

template <uint64_t Capacity, typename ValueType>
IndexQueue<Capacity, ValueType>::IndexQueue(ConstructFull_t) noexcept
    : m_readPosition(Index(0U))
    , m_writePosition(Index(Capacity))
{
    for (uint64_t i = 0U; i < Capacity; ++i)
    {
        m_cells[i].store(Index(i), std::memory_order_relaxed);
    }
}

template <uint64_t Capacity, typename ValueType>
constexpr uint64_t IndexQueue<Capacity, ValueType>::capacity() const noexcept
{
    return Capacity;
}

template <uint64_t Capacity, typename ValueType>
void IndexQueue<Capacity, ValueType>::push(const ValueType index) noexcept
{
    // we need the CAS loop here since we may fail due to concurrent push operations
    // note that we are always able to succeed to publish since we have
    // enough capacity for all unique indices used

    // case analyis
    // (1) loaded value is exactly one cycle behind:
    //     value is from the last cycle
    //     we can try to publish
    // (2) loaded value has the same cycle:
    //     some other push has published but not updated the write position
    //     help updating the write position
    // (3) loaded value is more than one cycle behind:
    //     this should only happen due to wrap around when push is interrupted for a long time
    //     reload write position and try again
    //     note that a complete wraparound can lead to a false detection of 1) (ABA problem)
    //     but this is very unlikely with e.g. a 64 bit value type
    // (4) loaded value is some cycle ahead:
    //     write position is outdated, there must have been other pushes concurrently
    //     reload write position and try again

    constexpr bool NotPublished = true;

    auto writePosition = m_writePosition.load(std::memory_order_relaxed);
    do
    {
        auto oldValue = loadvalueAt(writePosition, std::memory_order_relaxed);

        auto cellIsFree = oldValue.isOneCycleBehind(writePosition);

        if (cellIsFree)
        {
            // case (1)
            Index newValue(index, writePosition.getCycle());

            // if publish fails, another thread has published before us
            bool published = m_cells[writePosition.getIndex()].compare_exchange_weak(
                oldValue, newValue, std::memory_order_relaxed, std::memory_order_relaxed);

            if (published)
            {
                break;
            }
        }

        // even if we are not able to publish, we check whether some other push has already updated the writePosition
        // before trying again to publish
        auto writePositionRequiresUpdate = oldValue.getCycle() == writePosition.getCycle();

        if (writePositionRequiresUpdate)
        {
            // case (2)
            // the writePosition was not updated yet by another push but the value was already written
            // help with the update
            // (note that we do not care if it fails, then a retry or another push will handle it)

            Index newWritePosition(writePosition + 1U);
            m_writePosition.compare_exchange_strong(
                writePosition, newWritePosition, std::memory_order_relaxed, std::memory_order_relaxed);
        }
        else
        {
            // case (3) and (4)
            // note: we do not update with CAS here, the CAS is bound to fail anyway
            // (since our value of writePosition is not up to date so needs to be loaded again)
            writePosition = m_writePosition.load(std::memory_order_relaxed);
        }

    } while (NotPublished);

    Index newWritePosition(writePosition + 1U);

    // if this compare-exchange fails it is no problem, this only delays the update of m_writePosition
    // for other pushes which are able to do them on their own (if writePositionRequiresUpdate above is true)
    // no one else except popIfFull requires this update:
    // In this case it is also ok: the push is only complete once this update of m_writePosition was executed,
    // and the queue (logically) cannot be full until this happens.
    m_writePosition.compare_exchange_strong(
        writePosition, newWritePosition, std::memory_order_relaxed, std::memory_order_relaxed);
}

template <uint64_t Capacity, typename ValueType>
bool IndexQueue<Capacity, ValueType>::pop(ValueType& index) noexcept
{
    // we need the CAS loop here since we may fail due to concurrent pop operations
    // we leave when we detect an empty queue, otherwise we retry the pop operation

    // case analyis
    // (1) loaded value has the same cycle:
    //     value was not popped before
    //     try to get ownership
    // (2) loaded value is exactly one cycle behind:
    //     value is from the last cycle which means the queue is empty
    //     return false
    // (3) loaded value is more than one cycle behind:
    //     this should only happen due to wrap around when push is interrupted for a long time
    //     reload read position and try again
    // (4) loaded value is some cycle ahead:
    //     read position is outdated, there must have been pushes concurrently
    //     reload read position and try again

    bool ownershipGained = false;
    Index value;
    auto readPosition = m_readPosition.load(std::memory_order_relaxed);
    do
    {
        value = loadvalueAt(readPosition, std::memory_order_relaxed);

        // we only dequeue if value and readPosition are in the same cycle
        auto cellIsValidToRead = readPosition.getCycle() == value.getCycle();

        if (cellIsValidToRead)
        {
            // case (1)
            Index newReadPosition(readPosition + 1U);
            ownershipGained = m_readPosition.compare_exchange_weak(
                readPosition, newReadPosition, std::memory_order_relaxed, std::memory_order_relaxed);
        }
        else
        {
            // readPosition is ahead by one cycle, queue was empty at value load
            auto isEmpty = value.isOneCycleBehind(readPosition);

            if (isEmpty)
            {
                // case (2)
                return false;
            }

            // case (3) and (4) requires loading readPosition again
            readPosition = m_readPosition.load(std::memory_order_relaxed);
        }

        // readPosition is outdated, retry operation

    } while (!ownershipGained); // we leave if we gain ownership of readPosition

    index = value.getIndex();
    return true;
}

template <uint64_t Capacity, typename ValueType>
bool IndexQueue<Capacity, ValueType>::popIfFull(ValueType& index) noexcept
{
    // we do NOT need a CAS loop here since if we detect that the queue is not full
    // someone else popped an element and we do not retry to check whether it was filled AGAIN
    // concurrently (which will usually not be the case and then we would return false anyway)
    // if it is filled again we can (and will) retry popIfFull from the call site

    // the queue is full if and only if write position and read position are the same but read position is
    // one cycle behind write position
    // unfortunately it seems impossible in this design to check this condition without loading
    // write posiion and read position (which causes more contention)

    const auto writePosition = m_writePosition.load(std::memory_order_relaxed);
    auto readPosition = m_readPosition.load(std::memory_order_relaxed);
    const auto value = loadvalueAt(readPosition, std::memory_order_relaxed);

    auto isFull = writePosition.getIndex() == readPosition.getIndex() && readPosition.isOneCycleBehind(writePosition);

    if (isFull)
    {
        Index newReadPosition(readPosition + 1U);
        auto ownershipGained = m_readPosition.compare_exchange_strong(
            readPosition, newReadPosition, std::memory_order_relaxed, std::memory_order_relaxed);

        if (ownershipGained)
        {
            index = value.getIndex();
            return true;
        }
    }

    // otherwise someone else has dequeued an index and the queue was not full at the start of this popIfFull
    return false;
}

template <uint64_t Capacity, typename ValueType>
bool IndexQueue<Capacity, ValueType>::popIfSizeIsAtLeast(const uint64_t requiredSize, ValueType& index) noexcept
{
    if (requiredSize == 0)
    {
        return pop(index);
    }
    // which to load first should make no difference for correctness
    // but for performance it might
    // note that without sync mechanisms (such as seq_cst), reordering is possible
    const auto writePosition = m_writePosition.load(std::memory_order_relaxed);
    auto readPosition = m_readPosition.load(std::memory_order_relaxed);

    // if readPosition + n = readPosition for some n>=0, the queue contains n elements
    // at this instant (!) but slightly later may contain more or less elements
    // while the m_readPosition and m_writePosition can grow during this operation,
    // we detect this for readPosition with compare_exchange and for writePosition it does not matter,
    // the queue will contain even more elements then ( > n)
    const int64_t delta = writePosition - readPosition;

    // delta < 0 can actually happen (atomic values may not be up to date, i.e. detect writePosition as smaller than
    // readPosition leading to negative delta)
    // since we cannot conclude that the queue is filled with requiredSize elements in this case we just return
    //
    // note that delta is signed and we cannot compare it to requiredSize (unsigned) when it is negative
    // without getting unexpected results (it will be converted to large positive numbers)
    if (delta < 0)
    {
        return false;
    }

    // delta is positive, therefore the conversion is fine (it surely fits into uint64_t)
    if (static_cast<uint64_t>(delta) >= requiredSize)
    {
        auto value = loadvalueAt(readPosition, std::memory_order_relaxed);
        Index newReadPosition(readPosition + 1U);
        auto ownershipGained = m_readPosition.compare_exchange_strong(
            readPosition, newReadPosition, std::memory_order_relaxed, std::memory_order_relaxed);
        if (ownershipGained)
        {
            index = value.getIndex();
            return true;
        }
    }

    return false;
}

template <uint64_t Capacity, typename ValueType>
cxx::optional<ValueType> IndexQueue<Capacity, ValueType>::pop() noexcept
{
    ValueType value;
    if (pop(value))
    {
        return value;
    }
    return cxx::nullopt;
}

template <uint64_t Capacity, typename ValueType>
cxx::optional<ValueType> IndexQueue<Capacity, ValueType>::popIfFull() noexcept
{
    ValueType value;
    if (popIfFull(value))
    {
        return value;
    }
    return cxx::nullopt;
}

template <uint64_t Capacity, typename ValueType>
cxx::optional<ValueType> IndexQueue<Capacity, ValueType>::popIfSizeIsAtLeast(const uint64_t size) noexcept
{
    ValueType value;
    if (popIfSizeIsAtLeast(size, value))
    {
        return value;
    }
    return cxx::nullopt;
}

template <uint64_t Capacity, typename ValueType>
bool IndexQueue<Capacity, ValueType>::empty() const noexcept
{
    const auto readPosition = m_readPosition.load(std::memory_order_relaxed);
    const auto value = loadvalueAt(readPosition, std::memory_order_relaxed);

    // if m_readPosition is ahead by one cycle compared to the value stored at head,
    // the queue was empty at the time of the loads above (but might not be anymore!)
    return value.isOneCycleBehind(readPosition);
}

template <uint64_t Capacity, typename ValueType>
typename IndexQueue<Capacity, ValueType>::Index
IndexQueue<Capacity, ValueType>::loadvalueAt(const Index& position, const std::memory_order memoryOrder) const
{
    return m_cells[position.getIndex()].load(memoryOrder);
}

} // namespace concurrent
} // namespace iox

Updated on 31 May 2022 at 15:29:15 CEST