Program Listing for File graph_cache.hpp

Return to documentation for file (src/detail/graph_cache.hpp)

// Copyright 2023 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef DETAIL__GRAPH_CACHE_HPP_
#define DETAIL__GRAPH_CACHE_HPP_

#include <cstddef>
#include <functional>
#include <map>
#include <memory>
#include <mutex>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <vector>

#include "event.hpp"
#include "liveliness_utils.hpp"

#include "rcutils/allocator.h"
#include "rcutils/types.h"

#include "rmw/rmw.h"
#include "rmw/get_topic_endpoint_info.h"
#include "rmw/names_and_types.h"

#include <zenoh/api/id.hxx>

namespace rmw_zenoh_cpp
{
// TODO(Yadunund): Consider changing this to an array of unordered_set where the index of the
// array corresponds to the EntityType enum. This way we don't need to mix
// pub/sub with client/service.
class TopicData;
using TopicDataPtr = std::shared_ptr<TopicData>;
class TopicData
{
public:
  liveliness::TopicInfo info_;

  using EntitySet = std::unordered_set<
    liveliness::ConstEntityPtr>;

  // The publishers or clients entities.
  EntitySet pubs_;

  // The subscriptions or services entities
  EntitySet subs_;

  // Returns nullptr if the entity does not contain topic_info.
  static TopicDataPtr make(liveliness::ConstEntityPtr entity);

private:
  // Private constructor to force users to rely on make.
  explicit TopicData(liveliness::ConstEntityPtr entity);
};

struct GraphNode
{
  std::string zid_;
  std::string nid_;
  std::string ns_;
  std::string name_;
  // TODO(Yadunund): Should enclave be the parent to the namespace key and not within a Node?
  std::string enclave_;

  // Map QoS (serialized using liveliness::qos_to_keyexpr) to TopicData
  using TopicQoSMap = std::unordered_map<std::string, TopicDataPtr>;
  // Map topic type to QoSMap
  using TopicTypeMap = std::unordered_map<std::string, TopicQoSMap>;
  // Map topic name to TopicTypeMap
  // This uses a map that sort element by name because some parts of the client libraries
  // expect that these are returned in alphabetical order.
  using TopicMap = std::map<std::string, TopicTypeMap>;

  // Entries for pub/sub.
  TopicMap pubs_ = {};
  TopicMap subs_ = {};

  // Entries for service/client.
  TopicMap clients_ = {};
  TopicMap services_ = {};
};
using GraphNodePtr = std::shared_ptr<GraphNode>;

class GraphCache final
{
public:
  using GraphCacheEventCallback = std::function<void (int32_t change)>;

  explicit GraphCache(const zenoh::Id & zid);

  // Parse a PUT message over a token's key-expression and update the graph.
  void parse_put(const std::string & keyexpr, bool ignore_from_current_session = false);
  // Parse a DELETE message over a token's key-expression and update the graph.
  void parse_del(const std::string & keyexpr, bool ignore_from_current_session = false);

  rmw_ret_t get_node_names(
    rcutils_string_array_t * node_names,
    rcutils_string_array_t * node_namespaces,
    rcutils_string_array_t * enclaves,
    rcutils_allocator_t * allocator) const;

  rmw_ret_t get_topic_names_and_types(
    rcutils_allocator_t * allocator,
    bool no_demangle,
    rmw_names_and_types_t * topic_names_and_types) const;

  rmw_ret_t publisher_count_matched_subscriptions(
    const liveliness::TopicInfo & pub_topic_info,
    size_t * subscription_count);

  rmw_ret_t subscription_count_matched_publishers(
    const liveliness::TopicInfo & sub_topic_info,
    size_t * publisher_count);

  rmw_ret_t get_service_names_and_types(
    rcutils_allocator_t * allocator,
    rmw_names_and_types_t * service_names_and_types) const;

  rmw_ret_t count_publishers(
    const char * topic_name,
    size_t * count) const;

  rmw_ret_t count_subscriptions(
    const char * topic_name,
    size_t * count) const;

  rmw_ret_t count_services(
    const char * service_name,
    size_t * count) const;

  rmw_ret_t count_clients(
    const char * service_name,
    size_t * count) const;

  rmw_ret_t get_entity_names_and_types_by_node(
    liveliness::EntityType entity_type,
    rcutils_allocator_t * allocator,
    const char * node_name,
    const char * node_namespace,
    bool no_demangle,
    rmw_names_and_types_t * names_and_types) const;

  rmw_ret_t get_entities_info_by_topic(
    liveliness::EntityType entity_type,
    rcutils_allocator_t * allocator,
    const char * topic_name,
    bool no_demangle,
    rmw_topic_endpoint_info_array_t * endpoints_info) const;

  rmw_ret_t service_server_is_available(
    const liveliness::TopicInfo & client_topic_info,
    bool * is_available) const;

  void set_qos_event_callback(
    std::size_t entity_gid_hash,
    const rmw_zenoh_event_type_t & event_type,
    GraphCacheEventCallback callback);

  void remove_qos_event_callbacks(std::size_t entity_gid_hash);

  static bool is_entity_pub(const liveliness::Entity & entity);

private:
  // Helper function to convert an Entity into a GraphNode.
  // Note: this will update bookkeeping variables in GraphCache.
  std::shared_ptr<GraphNode> make_graph_node(const liveliness::Entity & entity) const;

  // Helper function to update TopicMap within the node the cache for the entire graph.
  void update_topic_maps_for_put(
    GraphNodePtr graph_node,
    liveliness::ConstEntityPtr entity);

  void update_topic_map_for_put(
    GraphNode::TopicMap & topic_map,
    liveliness::ConstEntityPtr entity,
    bool report_events = false);

  void update_topic_maps_for_del(
    GraphNodePtr graph_node,
    liveliness::ConstEntityPtr entity);

  void update_topic_map_for_del(
    GraphNode::TopicMap & topic_map,
    liveliness::ConstEntityPtr entity,
    bool report_events = false);

  void remove_topic_map_from_cache(
    const GraphNode::TopicMap & to_remove,
    GraphNode::TopicMap & from_cache);

  bool is_entity_local(const liveliness::Entity & entity) const;

  void update_event_counters(
    liveliness::ConstEntityPtr entity,
    const rmw_zenoh_event_type_t event_id,
    int32_t change);

  void handle_matched_events_for_put(
    liveliness::ConstEntityPtr entity,
    const GraphNode::TopicQoSMap & topic_qos_map);

  void handle_matched_events_for_del(
    liveliness::ConstEntityPtr entity,
    const GraphNode::TopicQoSMap & topic_qos_map);

  std::string zid_str_;
  /*
  namespace_1:
    node_1:
      enclave:
      publishers: [
          {
            topic:
            type:
            qos:
          }
      ],
      subscriptions: [
          {
            topic:
            type:
            qos:
          }
      ],
  namespace_2:
    node_n:
  */

  // We rely on a multimap to store nodes with duplicate names.
  using NodeMap = std::multimap<std::string, GraphNodePtr>;
  using NamespaceMap = std::unordered_map<std::string, NodeMap>;
  // Map namespace to a map of <node_name, GraphNodePtr>.
  NamespaceMap graph_ = {};
  size_t total_nodes_in_graph_{0};

  // Optimize pub/sub lookups across the graph.
  GraphNode::TopicMap graph_topics_ = {};
  // Optimize service/client lookups across the graph.
  GraphNode::TopicMap graph_services_ = {};

  using GraphEventCallbacks = std::unordered_map<rmw_zenoh_event_type_t, GraphCacheEventCallback>;
  // Map an entity's gid_hash to a map of event callbacks.
  // Note: Since we use unordered_map, we will only store a single callback for an
  // entity string. So we do not support the case where a node create a duplicate
  // pub/sub with the exact same topic, type & QoS but registers a different callback
  // for the same event type. We could switch to a multimap here but removing the callback
  // will be impossible right now since entities do not have unique IDs.
  using GraphEventCallbackMap = std::unordered_map<std::size_t, GraphEventCallbacks>;
  // EventCallbackMap for each type of event we support in rmw_zenoh_cpp.
  GraphEventCallbackMap event_callbacks_;
  // Map an entity's gid_hash to another map of event_types which map to the change in
  // number of events.
  // This map is used to track changes of events which do not have callbacks registered yet.
  // When a callback does get registered, we check for any change history and trigger the callback
  // immediately after which we reset this map accordingly.
  std::unordered_map<std::size_t,
    std::unordered_map<rmw_zenoh_event_type_t, int32_t>> unregistered_event_changes_;
  std::mutex events_mutex_;

  // Mutex to lock before modifying the members above.
  mutable std::mutex graph_mutex_;
};
}  // namespace rmw_zenoh_cpp

#endif  // DETAIL__GRAPH_CACHE_HPP_