Skip to content

iceoryx_utils/internal/concurrent/taco.hpp🔗

Namespaces🔗

Name
iox
building block to easily create free function for logging in a library context
iox::concurrent

Classes🔗

Name
class iox::concurrent::TACO
TACO is an acronym for Thread Aware exChange Ownership. Exchanging data between thread needs some synchonization mechanism. This can be done with a mutex or atomics. If the data structure is larger than 64 bit or if more than one value need to be accessed in a synchronized manner, a mutex would be the only option. The TACO is a wait-free alternative to the mutex. Data can be exchanged between threads. The TACO is like a SoFi with one element, but with the possibility to read/write from multiple threads.

Source code🔗

// Copyright (c) 2019 by Robert Bosch GmbH. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// SPDX-License-Identifier: Apache-2.0
#ifndef IOX_UTILS_CONCURRENT_TACO_HPP
#define IOX_UTILS_CONCURRENT_TACO_HPP

#include "iceoryx_utils/cxx/helplets.hpp"
#include "iceoryx_utils/cxx/optional.hpp"

#include <atomic>
#include <cstdint>

namespace iox
{
namespace concurrent
{
enum class TACOMode
{
    AccecptDataFromSameContext,
    DenyDataFromSameContext
};

template <typename T, typename Context, uint32_t MaxNumberOfContext = 500>
class TACO
{
  private:
    struct Transaction
    {
        Context context{Context::END_OF_LIST};
        cxx::optional<T> data{cxx::nullopt_t()};
    };

    TACOMode m_mode;
    // this is the index of the transaction currently available for consumption
    std::atomic<uint32_t> m_pendingTransaction;

    static constexpr uint32_t NumberOfContext = static_cast<uint32_t>(Context::END_OF_LIST);
    // the corresponding transaction indices for the thread context;
    // the value of m_indicex[Context] contains the index of the m_transactions array which is owned by the context
    // so it's save to access m_transactions[m_indices[Context]]
    uint32_t m_indices[NumberOfContext];
    // this is a local buffer for the transaction, one for each thread that might access the TACO
    // and there needs to be one more element which is the one ready for consumption
    Transaction m_transactions[NumberOfContext + 1];

  public:
    TACO(TACOMode mode)
        : m_mode(mode)
        , m_pendingTransaction(NumberOfContext)
    {
        static_assert(std::is_enum<Context>::value, "TACO Context must be an enum class!");
        static_assert(std::is_convertible<Context, uint32_t>::value == false,
                      "TACO Context must be an enum class, not just an enum!");
        static_assert(std::is_same<uint32_t, typename std::underlying_type<Context>::type>::value,
                      "TACO Context underlying type must be uint32_t!");
        static_assert(static_cast<uint32_t>(Context::END_OF_LIST) < MaxNumberOfContext,
                      "TACO exceeded max number of contexts!");

        // initially assign the indices to the corresponding contexts
        uint32_t i = 0;
        for (auto& index : m_indices)
        {
            index = i;
            i++;
        }
    }

    TACO(const TACO&) = delete;
    TACO(TACO&&) = delete;
    TACO& operator=(const TACO&) = delete;
    TACO& operator=(TACO&&) = delete;

    cxx::optional<T> exchange(const T& data, Context context)
    {
        cxx::Expects(context < Context::END_OF_LIST);
        m_transactions[m_indices[static_cast<uint32_t>(context)]].data.emplace(data);
        return exchange(context);
    }

    cxx::optional<T> take(const Context context)
    {
        cxx::Expects(context < Context::END_OF_LIST);
        // there is no need to set the transaction for the corresponding context to nullopt_t, the exchange function
        // either moves the data, which leaves a nullopt_t or resets the data, which also results in a nullopt_t
        return exchange(context);
    }

    void store(const T& data, const Context context)
    {
        cxx::Expects(context < Context::END_OF_LIST);
        exchange(data, context);
    }

  private:
    cxx::optional<T> exchange(const Context context)
    {
        auto contextIndex = static_cast<uint32_t>(context);
        auto transactionIndexOld = m_indices[contextIndex];
        m_transactions[transactionIndexOld].context = context;

        m_indices[contextIndex] = m_pendingTransaction.exchange(transactionIndexOld, std::memory_order_acq_rel);
        auto transactionIndexNew = m_indices[contextIndex];

        if (m_mode == TACOMode::AccecptDataFromSameContext || m_transactions[transactionIndexNew].context != context)
        {
            return std::move(m_transactions[transactionIndexNew].data);
        }

        m_transactions[transactionIndexNew].data.reset();
        return cxx::nullopt_t();
    }
};
} // namespace concurrent
} // namespace iox

#endif // IOX_UTILS_CONCURRENT_TACO_HPP

Updated on 31 May 2022 at 15:29:15 CEST