iceoryx_utils/internal/concurrent/lockfree_queue/lockfree_queue.inl🔗
Namespaces🔗
Name |
---|
iox building block to easily create free function for logging in a library context |
iox::concurrent |
Source code🔗
// Copyright (c) 2019, 2020 by Robert Bosch GmbH, Apex.AI Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// SPDX-License-Identifier: Apache-2.0
#include "iceoryx_utils/cxx/optional.hpp"
#include <utility>
namespace iox
{
namespace concurrent
{
template <typename ElementType, uint64_t Capacity>
LockFreeQueue<ElementType, Capacity>::LockFreeQueue() noexcept
: m_freeIndices(IndexQueue<Capacity>::ConstructFull)
, m_usedIndices(IndexQueue<Capacity>::ConstructEmpty)
{
}
template <typename ElementType, uint64_t Capacity>
constexpr uint64_t LockFreeQueue<ElementType, Capacity>::capacity() const noexcept
{
return Capacity;
}
template <typename ElementType, uint64_t Capacity>
bool LockFreeQueue<ElementType, Capacity>::tryPush(const ElementType& value) noexcept
{
BufferIndex index;
if (!m_freeIndices.pop(index))
{
return false; // detected full queue, value is unchanged (as demanded by const)
}
writeBufferAt(index, value); // const& version is called
m_usedIndices.push(index);
return true; // value was copied into the queue and is unchanged
}
template <typename ElementType, uint64_t Capacity>
bool LockFreeQueue<ElementType, Capacity>::tryPush(ElementType&& value) noexcept
{
BufferIndex index;
if (!m_freeIndices.pop(index))
{
return false; // detected full queue
}
writeBufferAt(index, std::forward<ElementType>(value)); //&& version is called
m_usedIndices.push(index);
return true;
}
template <typename ElementType, uint64_t Capacity>
template <typename T>
iox::cxx::optional<ElementType> LockFreeQueue<ElementType, Capacity>::pushImpl(T&& value) noexcept
{
cxx::optional<ElementType> evictedValue;
BufferIndex index;
while (!m_freeIndices.pop(index))
{
// only pop the index if the queue is still full
// note, this leads to issues if an index is lost
// (only possible due to an application crash)
// then the queue can never be full and we may never leave if no one calls a concurrent pop
// A quick remedy is not to use a conditional pop such as popIfFull here, but a normal one.
// However, then it can happen that due to a concurrent pop it was not really necessary to
// evict a value (i.e. we may needlessly lose values in rare cases)
// Whether there is another acceptable solution needs to be explored.
if (m_usedIndices.popIfFull(index))
{
evictedValue = readBufferAt(index);
break;
}
// if m_usedIndices was not full we try again (m_freeIndices should contain an index in this case)
// note that it is theoretically possible to be unsuccessful indefinitely
// (and thus we would have an infinite loop)
// but this requires a timing of concurrent pushes and pops which is exceptionally unlikely in practice
}
// if we removed from a full queue via popIfFull it might not be full anymore when a concurrent pop occurs
writeBufferAt(index, value); //&& version is called due to explicit conversion via std::move
m_usedIndices.push(index);
return evictedValue; // value was moved into the queue, if a value was evicted to do so return it
}
template <typename ElementType, uint64_t Capacity>
iox::cxx::optional<ElementType> LockFreeQueue<ElementType, Capacity>::push(const ElementType& value) noexcept
{
return pushImpl(std::forward<const ElementType>(value));
}
template <typename ElementType, uint64_t Capacity>
iox::cxx::optional<ElementType> LockFreeQueue<ElementType, Capacity>::push(ElementType&& value) noexcept
{
return pushImpl(std::forward<ElementType>(value));
}
template <typename ElementType, uint64_t Capacity>
iox::cxx::optional<ElementType> LockFreeQueue<ElementType, Capacity>::pop() noexcept
{
BufferIndex index;
if (!m_usedIndices.pop(index))
{
return cxx::nullopt; // detected empty queue
}
auto result = readBufferAt(index);
m_freeIndices.push(index);
return result;
}
template <typename ElementType, uint64_t Capacity>
bool LockFreeQueue<ElementType, Capacity>::empty() const noexcept
{
return m_usedIndices.empty();
}
template <typename ElementType, uint64_t Capacity>
uint64_t LockFreeQueue<ElementType, Capacity>::size() const noexcept
{
return m_size.load(std::memory_order_relaxed);
}
template <typename ElementType, uint64_t Capacity>
cxx::optional<ElementType> LockFreeQueue<ElementType, Capacity>::readBufferAt(const BufferIndex& index)
{
// also used for buffer synchronization
m_size.fetch_sub(1u, std::memory_order_acquire);
auto& element = m_buffer[index];
cxx::optional<ElementType> result(std::move(element));
element.~ElementType();
return result;
}
template <typename ElementType, uint64_t Capacity>
template <typename T>
void LockFreeQueue<ElementType, Capacity>::writeBufferAt(const BufferIndex& index, T&& value)
{
auto elementPtr = m_buffer.ptr(index);
new (elementPtr) ElementType(std::forward<T>(value)); // move ctor invoked when available, copy ctor otherwise
// also used for buffer synchronization
m_size.fetch_add(1u, std::memory_order_release);
}
} // namespace concurrent
} // namespace iox
Updated on 31 May 2022 at 15:29:15 CEST