Skip to content

iceoryx_posh/internal/roudi/introspection/port_introspection.hpp🔗

Namespaces🔗

Name
iox
iox::roudi

Classes🔗

Name
class iox::roudi::PortIntrospection
This class handles the port introspection for RouDi. It is recommended to use the PortIntrospectionType alias which sets the intended template parameter required for the actual introspection. The class manages a thread that periodically updates a field with port introspection data to which clients may subscribe.

Source code🔗

// Copyright (c) 2019 - 2020 by Robert Bosch GmbH. All rights reserved.
// Copyright (c) 2020 - 2021 by Apex.AI Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// SPDX-License-Identifier: Apache-2.0
#ifndef IOX_POSH_ROUDI_INTROSPECTION_PORT_INTROSPECTION_HPP
#define IOX_POSH_ROUDI_INTROSPECTION_PORT_INTROSPECTION_HPP

#include "fixed_size_container.hpp"
#include "iceoryx_posh/iceoryx_posh_types.hpp"
#include "iceoryx_posh/internal/popo/ports/publisher_port_data.hpp"
#include "iceoryx_posh/internal/popo/ports/publisher_port_user.hpp"
#include "iceoryx_posh/mepoo/chunk_header.hpp"
#include "iceoryx_posh/roudi/introspection_types.hpp"
#include "iceoryx_utils/cxx/helplets.hpp"
#include "iceoryx_utils/cxx/method_callback.hpp"
#include "iceoryx_utils/internal/concurrent/periodic_task.hpp"

#include <atomic>
#include <mutex>
#include <vector>

#include <map>

namespace iox
{
namespace roudi
{
template <typename PublisherPort, typename SubscriberPort>
class PortIntrospection
{
  private:
    enum class ConnectionState
    {
        DEFAULT,
        SUB_REQUESTED,
        CONNECTED
    };

    using PortIntrospectionTopic = PortIntrospectionFieldTopic;
    using PortThroughputIntrospectionTopic = PortThroughputIntrospectionFieldTopic;

    class PortData
    {
      private:

        struct ConnectionInfo;

        struct PublisherInfo
        {
            PublisherInfo() noexcept
            {
            }

            PublisherInfo(typename PublisherPort::MemberType_t& portData)
                : portData(&portData)
                , process(portData.m_runtimeName)
                , service(portData.m_serviceDescription)
                , node(portData.m_nodeName)
            {
            }

            typename PublisherPort::MemberType_t* portData{nullptr};
            RuntimeName_t process;
            capro::ServiceDescription service;
            NodeName_t node;

            using TimePointNs_t = mepoo::TimePointNs_t;
            using DurationNs_t = mepoo::DurationNs_t;
            TimePointNs_t m_sequenceNumberTimestamp{DurationNs_t(0)};
            mepoo::SequenceNumber_t m_sequenceNumber{0U};

            std::map<int, ConnectionInfo*> connectionMap;
            int index{-1};
        };

        struct SubscriberInfo
        {
            SubscriberInfo() noexcept = default;

            SubscriberInfo(typename SubscriberPort::MemberType_t& portData)
                : portData(&portData)
                , process(portData.m_runtimeName)
                , service(portData.m_serviceDescription)
                , node(portData.m_nodeName)
            {
            }

            typename SubscriberPort::MemberType_t* portData{nullptr};
            RuntimeName_t process;
            capro::ServiceDescription service;
            NodeName_t node;
        };

        struct ConnectionInfo
        {
            ConnectionInfo() noexcept = default;

            ConnectionInfo(typename SubscriberPort::MemberType_t& portData)
                : subscriberInfo(portData)
                , state(ConnectionState::DEFAULT)
            {
            }

            ConnectionInfo(SubscriberInfo& subscriberInfo) noexcept
                : subscriberInfo(subscriberInfo)
                , state(ConnectionState::DEFAULT)
            {
            }

            SubscriberInfo subscriberInfo;
            PublisherInfo* publisherInfo{nullptr};
            ConnectionState state{ConnectionState::DEFAULT};

            bool isConnected() const noexcept
            {
                return publisherInfo && state == ConnectionState::CONNECTED;
            }
        };

      public:
        PortData() noexcept;

        bool addPublisher(typename PublisherPort::MemberType_t& port);

        bool addSubscriber(typename SubscriberPort::MemberType_t& portData);

        bool removePublisher(const PublisherPort& port);

        bool removeSubscriber(const SubscriberPort& port);

        bool updateConnectionState(const capro::CaproMessage& message) noexcept;

        bool updateSubscriberConnectionState(const capro::CaproMessage& message, const UniquePortId& id);

        void prepareTopic(PortIntrospectionTopic& topic) noexcept;

        void prepareTopic(PortThroughputIntrospectionTopic& topic) noexcept;

        void prepareTopic(SubscriberPortChangingIntrospectionFieldTopic& topic) noexcept;

        template <typename T, std::enable_if_t<std::is_same<T, iox::build::OneToManyPolicy>::value>* = nullptr>
        PortIntrospection::ConnectionState getNextState(ConnectionState currentState,
                                                        capro::CaproMessageType messageType) noexcept;

        template <typename T, std::enable_if_t<std::is_same<T, iox::build::ManyToManyPolicy>::value>* = nullptr>
        PortIntrospection::ConnectionState getNextState(ConnectionState currentState,
                                                        capro::CaproMessageType messageType) noexcept;

        bool isNew() const noexcept;

        void setNew(bool value) noexcept;

      private:
        using PublisherContainer = FixedSizeContainer<PublisherInfo, MAX_PUBLISHERS>;
        using ConnectionContainer = FixedSizeContainer<ConnectionInfo, MAX_SUBSCRIBERS>;

        std::map<capro::ServiceDescription, std::map<UniquePortId, typename PublisherContainer::Index_t>>
            m_publisherMap;

        std::map<capro::ServiceDescription, std::map<UniquePortId, typename ConnectionContainer::Index_t>>
            m_connectionMap;

        PublisherContainer m_publisherContainer;
        ConnectionContainer m_connectionContainer;

        std::atomic<bool> m_newData;
        std::mutex m_mutex;
    };

    // end of helper classes

  public:
    PortIntrospection() noexcept;

    ~PortIntrospection() noexcept;

    // delete copy constructor and assignment operator
    PortIntrospection(PortIntrospection const&) = delete;
    PortIntrospection& operator=(PortIntrospection const&) = delete;
    // delete move constructor and assignment operator
    PortIntrospection(PortIntrospection&&) = delete;
    PortIntrospection& operator=(PortIntrospection&&) = delete;

    bool addPublisher(typename PublisherPort::MemberType_t& port);

    bool addSubscriber(typename SubscriberPort::MemberType_t& port);

    bool removePublisher(const PublisherPort& port);

    bool removeSubscriber(const SubscriberPort& port);

    void reportMessage(const capro::CaproMessage& message) noexcept;

    void reportMessage(const capro::CaproMessage& message, const UniquePortId& id);

    bool registerPublisherPort(PublisherPort&& publisherPortGeneric,
                               PublisherPort&& publisherPortThroughput,
                               PublisherPort&& publisherPortSubscriberPortsData) noexcept;

    void setSendInterval(const units::Duration interval) noexcept;


    void run() noexcept;

    void stop() noexcept;

  protected:
    void sendPortData() noexcept;

    void sendThroughputData() noexcept;

    void sendSubscriberPortsData() noexcept;

    void send() noexcept;

  protected:
    cxx::optional<PublisherPort> m_publisherPort;
    cxx::optional<PublisherPort> m_publisherPortThroughput;
    cxx::optional<PublisherPort> m_publisherPortSubscriberPortsData;

  private:
    PortData m_portData;

    units::Duration m_sendInterval{units::Duration::fromSeconds(1U)};
    concurrent::PeriodicTask<cxx::MethodCallback<void>> m_publishingTask{
        concurrent::PeriodicTaskManualStart, "PortIntr", *this, &PortIntrospection::send};
};

using PortIntrospectionType = PortIntrospection<PublisherPortUserType, SubscriberPortUserType>;

} // namespace roudi
} // namespace iox

#include "port_introspection.inl"

#endif // IOX_POSH_ROUDI_INTROSPECTION_PORT_INTROSPECTION_HPP

Updated on 17 June 2021 at 11:15:27 CEST