Skip to content

iceoryx_posh/internal/roudi/introspection/port_introspection.inl🔗

Namespaces🔗

Name
iox
iox::roudi

Defines🔗

Name
IOX_POSH_ROUDI_INTROSPECTION_PORT_INTROSPECTION_INL

Macro Documentation🔗

define IOX_POSH_ROUDI_INTROSPECTION_PORT_INTROSPECTION_INL🔗

#define IOX_POSH_ROUDI_INTROSPECTION_PORT_INTROSPECTION_INL

Source code🔗

// Copyright (c) 2019 - 2020 by Robert Bosch GmbH. All rights reserved.
// Copyright (c) 2020 - 2021 by Apex.AI Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// SPDX-License-Identifier: Apache-2.0
#ifndef IOX_POSH_ROUDI_INTROSPECTION_PORT_INTROSPECTION_INL
#define IOX_POSH_ROUDI_INTROSPECTION_PORT_INTROSPECTION_INL

#include "iceoryx_utils/posix_wrapper/thread.hpp"

namespace iox
{
namespace roudi
{
template <typename PublisherPort, typename SubscriberPort>
inline PortIntrospection<PublisherPort, SubscriberPort>::PortIntrospection() noexcept
{
}

template <typename PublisherPort, typename SubscriberPort>
inline PortIntrospection<PublisherPort, SubscriberPort>::PortData::PortData() noexcept
    : m_newData(true)
{
}

template <typename PublisherPort, typename SubscriberPort>
inline PortIntrospection<PublisherPort, SubscriberPort>::~PortIntrospection() noexcept
{
    stop();
}

template <typename PublisherPort, typename SubscriberPort>
inline void PortIntrospection<PublisherPort, SubscriberPort>::reportMessage(const capro::CaproMessage& message) noexcept
{
    m_portData.updateConnectionState(message);
}

template <typename PublisherPort, typename SubscriberPort>
inline void PortIntrospection<PublisherPort, SubscriberPort>::reportMessage(const capro::CaproMessage& message,
                                                                            const UniquePortId& id)
{
    m_portData.updateSubscriberConnectionState(message, id);
}

template <typename PublisherPort, typename SubscriberPort>
inline bool PortIntrospection<PublisherPort, SubscriberPort>::registerPublisherPort(
    PublisherPort&& publisherPortGeneric,
    PublisherPort&& publisherPortThroughput,
    PublisherPort&& publisherPortSubscriberPortsData) noexcept
{
    if (m_publisherPort || m_publisherPortThroughput || m_publisherPortSubscriberPortsData)
    {
        return false;
    }

    m_publisherPort.emplace(std::move(publisherPortGeneric));
    m_publisherPortThroughput.emplace(std::move(publisherPortThroughput));
    m_publisherPortSubscriberPortsData.emplace(std::move(publisherPortSubscriberPortsData));

    return true;
}

template <typename PublisherPort, typename SubscriberPort>
inline void PortIntrospection<PublisherPort, SubscriberPort>::run() noexcept
{
    cxx::Expects(m_publisherPort.has_value());
    cxx::Expects(m_publisherPortThroughput.has_value());
    cxx::Expects(m_publisherPortSubscriberPortsData.has_value());

    // this is a field, there needs to be a sample before activate is called
    sendPortData();
    sendThroughputData();
    sendSubscriberPortsData();
    m_publisherPort->offer();
    m_publisherPortThroughput->offer();
    m_publisherPortSubscriberPortsData->offer();

    m_publishingTask.start(m_sendInterval);
}

template <typename PublisherPort, typename SubscriberPort>
inline void PortIntrospection<PublisherPort, SubscriberPort>::send() noexcept
{
    if (m_portData.isNew())
    {
        sendPortData();
    }
    sendThroughputData();
    sendSubscriberPortsData();
}

template <typename PublisherPort, typename SubscriberPort>
inline void PortIntrospection<PublisherPort, SubscriberPort>::sendPortData() noexcept
{
    auto maybeChunkHeader = m_publisherPort->tryAllocateChunk(sizeof(PortIntrospectionFieldTopic),
                                                              alignof(PortIntrospectionFieldTopic),
                                                              CHUNK_NO_USER_HEADER_SIZE,
                                                              CHUNK_NO_USER_HEADER_ALIGNMENT);
    if (!maybeChunkHeader.has_error())
    {
        auto sample = static_cast<PortIntrospectionFieldTopic*>(maybeChunkHeader.value()->userPayload());
        new (sample) PortIntrospectionFieldTopic();

        m_portData.prepareTopic(*sample); // requires internal mutex (blocks
                                          // further introspection events)
        m_publisherPort->sendChunk(maybeChunkHeader.value());
    }
}

template <typename PublisherPort, typename SubscriberPort>
inline void PortIntrospection<PublisherPort, SubscriberPort>::sendThroughputData() noexcept
{
    auto maybeChunkHeader = m_publisherPortThroughput->tryAllocateChunk(sizeof(PortThroughputIntrospectionFieldTopic),
                                                                        alignof(PortThroughputIntrospectionFieldTopic),
                                                                        CHUNK_NO_USER_HEADER_SIZE,
                                                                        CHUNK_NO_USER_HEADER_ALIGNMENT);
    if (!maybeChunkHeader.has_error())
    {
        auto throughputSample =
            static_cast<PortThroughputIntrospectionFieldTopic*>(maybeChunkHeader.value()->userPayload());
        new (throughputSample) PortThroughputIntrospectionFieldTopic();

        m_portData.prepareTopic(*throughputSample); // requires internal mutex (blocks
        // further introspection events)
        m_publisherPortThroughput->sendChunk(maybeChunkHeader.value());
    }
}

template <typename PublisherPort, typename SubscriberPort>
inline void PortIntrospection<PublisherPort, SubscriberPort>::sendSubscriberPortsData() noexcept
{
    auto maybeChunkHeader =
        m_publisherPortSubscriberPortsData->tryAllocateChunk(sizeof(SubscriberPortChangingIntrospectionFieldTopic),
                                                             alignof(SubscriberPortChangingIntrospectionFieldTopic),
                                                             CHUNK_NO_USER_HEADER_SIZE,
                                                             CHUNK_NO_USER_HEADER_ALIGNMENT);
    if (!maybeChunkHeader.has_error())
    {
        auto subscriberPortChangingDataSample =
            static_cast<SubscriberPortChangingIntrospectionFieldTopic*>(maybeChunkHeader.value()->userPayload());
        new (subscriberPortChangingDataSample) SubscriberPortChangingIntrospectionFieldTopic();

        m_portData.prepareTopic(*subscriberPortChangingDataSample); // requires internal mutex (blocks
        // further introspection events)
        m_publisherPortSubscriberPortsData->sendChunk(maybeChunkHeader.value());
    }
}

template <typename PublisherPort, typename SubscriberPort>
inline void PortIntrospection<PublisherPort, SubscriberPort>::setSendInterval(const units::Duration interval) noexcept
{
    m_sendInterval = interval;
    if (m_publishingTask.isActive())
    {
        m_publishingTask.stop();
        m_publishingTask.start(m_sendInterval);
    }
}

template <typename PublisherPort, typename SubscriberPort>
inline void PortIntrospection<PublisherPort, SubscriberPort>::stop() noexcept
{
    m_publishingTask.stop();
}

template <typename PublisherPort, typename SubscriberPort>
inline bool PortIntrospection<PublisherPort, SubscriberPort>::PortData::updateConnectionState(
    const capro::CaproMessage& message) noexcept
{
    const capro::ServiceDescription& service = message.m_serviceDescription;
    capro::CaproMessageType messageType = message.m_type;

    std::lock_guard<std::mutex> lock(m_mutex);

    auto iter = m_connectionMap.find(service);
    if (iter == m_connectionMap.end())
    {
        return false; // no corresponding capro Id ...
    }

    auto& innerConnectionMap = iter->second;

    for (auto& pair : innerConnectionMap)
    {
        auto& connection = m_connectionContainer[pair.second];
        connection.state = getNextState<iox::build::CommunicationPolicy>(connection.state, messageType);
    }

    setNew(true);
    return true;
}

template <typename PublisherPort, typename SubscriberPort>
inline bool PortIntrospection<PublisherPort, SubscriberPort>::PortData::updateSubscriberConnectionState(
    const capro::CaproMessage& message, const UniquePortId& id)
{
    const capro::ServiceDescription& service = message.m_serviceDescription;
    capro::CaproMessageType messageType = message.m_type;

    std::lock_guard<std::mutex> lock(m_mutex);

    auto iter = m_connectionMap.find(service);
    if (iter == m_connectionMap.end())
    {
        return false; // no corresponding capro Id ...
    }

    auto& innerConnectionMap = iter->second;

    auto iterInnerMap = innerConnectionMap.find(id);
    if (iterInnerMap == innerConnectionMap.end())
    {
        return false;
    }

    auto& connection = m_connectionContainer[iterInnerMap->second];
    connection.state = getNextState<iox::build::CommunicationPolicy>(connection.state, messageType);

    setNew(true);
    return true;
}

template <typename PublisherPort, typename SubscriberPort>
inline bool
PortIntrospection<PublisherPort, SubscriberPort>::PortData::addPublisher(typename PublisherPort::MemberType_t& port)
{
    std::lock_guard<std::mutex> lock(m_mutex);

    auto service = port.m_serviceDescription;
    auto uniqueId = port.m_uniqueId;

    auto index = m_publisherContainer.add(PublisherInfo(port));
    if (index < 0)
    {
        return false;
    }

    auto iter = m_publisherMap.find(service);
    if (iter == m_publisherMap.end())
    {
        // service is new, create new map
        std::map<UniquePortId, typename PublisherContainer::Index_t> innerPublisherMap;
        innerPublisherMap.insert(std::make_pair(uniqueId, index));
        m_publisherMap.insert(std::make_pair(service, innerPublisherMap));
    }
    else
    {
        // service exists, add new entry
        auto& innerPublisherMap = iter->second;
        auto iter = innerPublisherMap.find(uniqueId);
        if (iter == innerPublisherMap.end())
        {
            innerPublisherMap.insert(std::make_pair(uniqueId, index));
        }
        else
        {
            return false;
        }
    }

    // connect publisher to all subscribers with the same Id
    PublisherInfo* publisher = m_publisherContainer.get(index);

    // find corresponding subscribers
    auto connIter = m_connectionMap.find(service);
    if (connIter != m_connectionMap.end())
    {
        auto& innerConnectionMap = connIter->second;
        for (auto& pair : innerConnectionMap)
        {
            auto& connection = m_connectionContainer[pair.second];
            if (service == connection.subscriberInfo.service)
            {
                connection.publisherInfo = publisher;
            }
        }
    }

    setNew(true);
    return true;
}

template <typename PublisherPort, typename SubscriberPort>
inline bool PortIntrospection<PublisherPort, SubscriberPort>::PortData::addSubscriber(
    typename SubscriberPort::MemberType_t& portData)
{
    std::lock_guard<std::mutex> lock(m_mutex);

    auto service = portData.m_serviceDescription;
    auto uniqueId = portData.m_uniqueId;

    auto index = m_connectionContainer.add(ConnectionInfo(portData));
    if (index < 0)
    {
        return false;
    }

    auto iter = m_connectionMap.find(service);

    if (iter == m_connectionMap.end())
    {
        // service is new, create new map
        std::map<UniquePortId, typename ConnectionContainer::Index_t> innerConnectionMap;
        innerConnectionMap.insert(std::make_pair(uniqueId, index));
        m_connectionMap.insert(std::make_pair(service, innerConnectionMap));
    }
    else
    {
        // service exists, add new entry
        auto& innerConnectionMap = iter->second;
        auto iter = innerConnectionMap.find(uniqueId);
        if (iter == innerConnectionMap.end())
        {
            innerConnectionMap.insert(std::make_pair(uniqueId, index));
        }
        else
        {
            return false;
        }
    }

    auto& connection = m_connectionContainer[index];

    auto sendIter = m_publisherMap.find(service);
    if (sendIter != m_publisherMap.end())
    {
        auto& innerPublisherMap = sendIter->second;
        for (auto& iter : innerPublisherMap)
        {
            auto publisher = m_publisherContainer.get(iter.second);
            connection.publisherInfo = publisher; // set corresponding publisher if exists
        }
    }

    return true;
}

template <typename PublisherPort, typename SubscriberPort>
inline bool PortIntrospection<PublisherPort, SubscriberPort>::PortData::removePublisher(const PublisherPort& port)
{
    std::lock_guard<std::mutex> lock(m_mutex);

    auto iter = m_publisherMap.find(port.getCaProServiceDescription());
    if (iter == m_publisherMap.end())
    {
        return false;
    }

    auto& innerPublisherMap = iter->second;
    auto iterInnerMap = innerPublisherMap.find(port.getUniqueID());
    if (iterInnerMap == innerPublisherMap.end())
    {
        return false;
    }
    auto m_publisherIndex = iterInnerMap->second;
    auto& publisher = m_publisherContainer[m_publisherIndex];

    // disconnect publisher from all its subscribers
    for (auto& pair : publisher.connectionMap)
    {
        pair.second->publisherInfo = nullptr;          // publisher is disconnected
        pair.second->state = ConnectionState::DEFAULT; // connection state is now default
    }

    innerPublisherMap.erase(iterInnerMap);
    m_publisherContainer.remove(m_publisherIndex);
    setNew(true); // indicates we have to send new data because
                  // something changed

    return true;
}

template <typename PublisherPort, typename SubscriberPort>
inline bool PortIntrospection<PublisherPort, SubscriberPort>::PortData::removeSubscriber(const SubscriberPort& port)
{
    std::lock_guard<std::mutex> lock(m_mutex);

    auto iter = m_connectionMap.find(port.getCaProServiceDescription());
    if (iter == m_connectionMap.end())
    {
        return false; // not found and therefore not removed
    }

    auto& innerConnectionMap = iter->second;
    auto mapIter = innerConnectionMap.find(port.getUniqueID());

    if (mapIter == innerConnectionMap.end())
    {
        return false; // not found and therefore not removed
    }

    // remove subscriber in corresponding publisher
    auto connectionIndex = mapIter->second;
    auto& connection = m_connectionContainer[connectionIndex];
    auto& publisher = connection.publisherInfo;

    if (publisher)
    {
        auto connIter = publisher->connectionMap.find(connectionIndex);
        if (connIter != publisher->connectionMap.end())
        {
            publisher->connectionMap.erase(connIter);
        }
    }

    innerConnectionMap.erase(mapIter);
    m_connectionContainer.remove(connectionIndex);

    setNew(true);
    return true;
}

template <typename PublisherPort, typename SubscriberPort>
template <typename T, std::enable_if_t<std::is_same<T, iox::build::OneToManyPolicy>::value>*>
inline typename PortIntrospection<PublisherPort, SubscriberPort>::ConnectionState
PortIntrospection<PublisherPort, SubscriberPort>::PortData::getNextState(ConnectionState currentState,
                                                                         capro::CaproMessageType messageType) noexcept
{
    ConnectionState nextState = currentState; // stay in currentState as default transition

    // publisher and subscriber can only send a subset of messages (e.g. no
    // sub request from publisher), so it is not required to check whether
    // publisher or subscriber has sent the message...

    switch (currentState)
    {
    case ConnectionState::DEFAULT:
    {
        if (messageType == capro::CaproMessageType::SUB)
        {
            nextState = ConnectionState::SUB_REQUESTED;
        }
        break;
    }

    case ConnectionState::SUB_REQUESTED:
    {
        if (messageType == capro::CaproMessageType::ACK)
        {
            nextState = ConnectionState::CONNECTED;
        }
        else if (messageType == capro::CaproMessageType::NACK)
        {
            nextState = ConnectionState::DEFAULT;
        }
        break;
    }

    case ConnectionState::CONNECTED:
    {
        if (messageType == capro::CaproMessageType::STOP_OFFER)
        {
            nextState = ConnectionState::DEFAULT;
        }
        else if (messageType == capro::CaproMessageType::UNSUB)
        {
            nextState = ConnectionState::DEFAULT;
        }
        break;
    }

    default:
    { // stay in current state
    }
    };

    return nextState;
}

template <typename PublisherPort, typename SubscriberPort>
template <typename T, std::enable_if_t<std::is_same<T, iox::build::ManyToManyPolicy>::value>*>
inline typename PortIntrospection<PublisherPort, SubscriberPort>::ConnectionState
PortIntrospection<PublisherPort, SubscriberPort>::PortData::getNextState(ConnectionState currentState,
                                                                         capro::CaproMessageType messageType) noexcept
{
    ConnectionState nextState = currentState; // stay in currentState as default transition

    switch (currentState)
    {
    case ConnectionState::DEFAULT:
    {
        if (messageType == capro::CaproMessageType::SUB)
        {
            nextState = ConnectionState::CONNECTED;
        }
        break;
    }

    case ConnectionState::CONNECTED:
    {
        if (messageType == capro::CaproMessageType::UNSUB)
        {
            nextState = ConnectionState::DEFAULT;
        }
        break;
    }

    default:
    { // stay in current state
    }
    };

    return nextState;
}

template <typename PublisherPort, typename SubscriberPort>
inline void
PortIntrospection<PublisherPort, SubscriberPort>::PortData::prepareTopic(PortIntrospectionTopic& topic) noexcept
{
    auto& m_publisherList = topic.m_publisherList;

    std::lock_guard<std::mutex> lock(m_mutex); // we need to lock the internal data structs

    int32_t index{0};
    for (auto& pub : m_publisherMap)
    {
        auto& innerPublisherMap = pub.second;
        for (auto& pair : innerPublisherMap)
        {
            auto m_publisherIndex = pair.second;
            if (m_publisherIndex >= 0)
            {
                auto& publisherInfo = m_publisherContainer[m_publisherIndex];
                PublisherPortData publisherData;
                PublisherPort port(publisherInfo.portData);
                publisherData.m_publisherPortID = static_cast<uint64_t>(port.getUniqueID());
                publisherData.m_sourceInterface = publisherInfo.service.getSourceInterface();
                publisherData.m_name = publisherInfo.process;
                publisherData.m_node = publisherInfo.node;

                publisherData.m_caproInstanceID = publisherInfo.service.getInstanceIDString();
                publisherData.m_caproServiceID = publisherInfo.service.getServiceIDString();
                publisherData.m_caproEventMethodID = publisherInfo.service.getEventIDString();

                m_publisherList.emplace_back(publisherData);
                publisherInfo.index = index++;
            }
        }
    }

    auto& m_subscriberList = topic.m_subscriberList;
    for (auto& connPair : m_connectionMap)
    {
        for (auto& pair : connPair.second)
        {
            auto connectionIndex = pair.second;
            if (connectionIndex >= 0)
            {
                auto& connection = m_connectionContainer[connectionIndex];
                SubscriberPortData subscriberData;
                auto& subscriberInfo = connection.subscriberInfo;

                subscriberData.m_name = subscriberInfo.process;
                subscriberData.m_node = subscriberInfo.node;

                subscriberData.m_caproInstanceID = subscriberInfo.service.getInstanceIDString();
                subscriberData.m_caproServiceID = subscriberInfo.service.getServiceIDString();
                subscriberData.m_caproEventMethodID = subscriberInfo.service.getEventIDString();
                m_subscriberList.emplace_back(subscriberData);
            }
        }
    }

    // needs to be done while holding the lock
    setNew(false);
}

template <typename PublisherPort, typename SubscriberPort>
inline void PortIntrospection<PublisherPort, SubscriberPort>::PortData::prepareTopic(
    PortThroughputIntrospectionTopic& topic IOX_MAYBE_UNUSED) noexcept
{
}

template <typename PublisherPort, typename SubscriberPort>
inline void PortIntrospection<PublisherPort, SubscriberPort>::PortData::prepareTopic(
    SubscriberPortChangingIntrospectionFieldTopic& topic) noexcept
{
    std::lock_guard<std::mutex> lock(m_mutex);
    for (auto& connPair : m_connectionMap)
    {
        for (auto& pair : connPair.second)
        {
            auto connectionIndex = pair.second;
            if (connectionIndex >= 0)
            {
                auto& connection = m_connectionContainer[connectionIndex];
                auto& subscriberInfo = connection.subscriberInfo;
                SubscriberPortChangingData subscriberData;
                if (subscriberInfo.portData != nullptr)
                {
                    SubscriberPort port(subscriberInfo.portData);
                    subscriberData.subscriptionState = port.getSubscriptionState();

                    // subscriberData.fifoCapacity = port .getDeliveryFiFoCapacity();
                    // subscriberData.fifoSize = port.getDeliveryFiFoSize();
                    subscriberData.propagationScope = port.getCaProServiceDescription().getScope();
                }
                else
                {
                    subscriberData.fifoCapacity = 0u;
                    subscriberData.fifoSize = 0u;
                    subscriberData.subscriptionState = iox::SubscribeState::NOT_SUBSCRIBED;
                    subscriberData.propagationScope = capro::Scope::INVALID;
                }
                topic.subscriberPortChangingDataList.push_back(subscriberData);
            }
        }
    }
}

template <typename PublisherPort, typename SubscriberPort>
inline bool PortIntrospection<PublisherPort, SubscriberPort>::PortData::isNew() const noexcept
{
    return m_newData.load(std::memory_order_acquire);
}

template <typename PublisherPort, typename SubscriberPort>
inline void PortIntrospection<PublisherPort, SubscriberPort>::PortData::setNew(bool value) noexcept
{
    m_newData.store(value, std::memory_order_release);
}

template <typename PublisherPort, typename SubscriberPort>
inline bool PortIntrospection<PublisherPort, SubscriberPort>::addPublisher(typename PublisherPort::MemberType_t& port)
{
    return m_portData.addPublisher(port);
}

template <typename PublisherPort, typename SubscriberPort>
inline bool PortIntrospection<PublisherPort, SubscriberPort>::addSubscriber(typename SubscriberPort::MemberType_t& port)
{
    return m_portData.addSubscriber(port);
}

template <typename PublisherPort, typename SubscriberPort>
inline bool PortIntrospection<PublisherPort, SubscriberPort>::removePublisher(const PublisherPort& port)
{
    return m_portData.removePublisher(port);
}

template <typename PublisherPort, typename SubscriberPort>
inline bool PortIntrospection<PublisherPort, SubscriberPort>::removeSubscriber(const SubscriberPort& port)
{
    return m_portData.removeSubscriber(port);
}

} // namespace roudi
} // namespace iox

#endif // IOX_POSH_ROUDI_INTROSPECTION_PORT_INTROSPECTION_INL

Updated on 26 April 2021 at 15:31:02 CEST