diff options
Diffstat (limited to 'implementation/routing/include/routing_manager_impl.hpp')
-rw-r--r-- | implementation/routing/include/routing_manager_impl.hpp | 69 |
1 files changed, 56 insertions, 13 deletions
diff --git a/implementation/routing/include/routing_manager_impl.hpp b/implementation/routing/include/routing_manager_impl.hpp index 13936a4..b08f2cd 100644 --- a/implementation/routing/include/routing_manager_impl.hpp +++ b/implementation/routing/include/routing_manager_impl.hpp @@ -58,7 +58,7 @@ public: void start(); void stop(); - void offer_service(client_t _client, service_t _service, + bool offer_service(client_t _client, service_t _service, instance_t _instance, major_version_t _major, minor_version_t _minor); @@ -82,7 +82,7 @@ public: bool send(client_t _client, std::shared_ptr<message> _message, bool _flush); bool send(client_t _client, const byte_t *_data, uint32_t _size, - instance_t _instance, bool _flush, bool _reliable, bool _initial = false); + instance_t _instance, bool _flush, bool _reliable); bool send_to(const std::shared_ptr<endpoint_definition> &_target, std::shared_ptr<message> _message); @@ -103,10 +103,11 @@ public: bool _is_provided); void notify(service_t _service, instance_t _instance, event_t _event, - std::shared_ptr<payload> _payload); + std::shared_ptr<payload> _payload, bool _force); void notify_one(service_t _service, instance_t _instance, - event_t _event, std::shared_ptr<payload> _payload, client_t _client); + event_t _event, std::shared_ptr<payload> _payload, + client_t _client, bool _force); void on_subscribe_nack(client_t _client, service_t _service, instance_t _instance, eventgroup_t _eventgroup); @@ -117,15 +118,15 @@ public: void on_identify_response(client_t _client, service_t _service, instance_t _instance, bool _reliable); - bool queue_message(const byte_t *_data, uint32_t _size) const; - // interface to stub std::shared_ptr<endpoint> find_local(client_t _client); std::shared_ptr<endpoint> find_or_create_local(client_t _client); void remove_local(client_t _client); - void on_stop_offer_service(service_t _service, instance_t _instance, + void on_stop_offer_service(client_t _client, service_t _service, instance_t _instance, major_version_t _major, minor_version_t _minor); + void on_pong(client_t _client); + // interface "endpoint_host" std::shared_ptr<endpoint> find_or_create_remote_client(service_t _service, instance_t _instance, @@ -151,7 +152,7 @@ public: std::shared_ptr<endpoint> create_service_discovery_endpoint(const std::string &_address, uint16_t _port, bool _reliable); void init_routing_info(); - void add_routing_info(service_t _service, instance_t _instance, + std::chrono::milliseconds add_routing_info(service_t _service, instance_t _instance, major_version_t _major, minor_version_t _minor, ttl_t _ttl, const boost::asio::ip::address &_reliable_address, uint16_t _reliable_port, @@ -165,11 +166,11 @@ public: eventgroup_t _eventgroup, std::shared_ptr<endpoint_definition> _subscriber, std::shared_ptr<endpoint_definition> _target, - const std::chrono::high_resolution_clock::time_point &_expiration); + const std::chrono::steady_clock::time_point &_expiration); bool on_subscribe_accepted(service_t _service, instance_t _instance, eventgroup_t _eventgroup, std::shared_ptr<endpoint_definition> _target, - const std::chrono::high_resolution_clock::time_point &_expiration); + const std::chrono::steady_clock::time_point &_expiration); void on_unsubscribe(service_t _service, instance_t _instance, eventgroup_t _eventgroup, std::shared_ptr<endpoint_definition> _target); @@ -179,11 +180,14 @@ public: void expire_subscriptions(const boost::asio::ip::address &_address); void expire_services(const boost::asio::ip::address &_address); - std::chrono::high_resolution_clock::time_point expire_subscriptions(); + std::chrono::steady_clock::time_point expire_subscriptions(); bool has_identified(client_t _client, service_t _service, instance_t _instance, bool _reliable); + void on_clientendpoint_error(client_t _client); + void confirm_pending_offers(client_t _client); + private: bool deliver_message(const byte_t *_data, length_t _length, instance_t _instance, bool _reliable); @@ -197,7 +201,7 @@ private: std::shared_ptr<endpoint> create_client_endpoint( const boost::asio::ip::address &_address, - uint16_t _local_port, uint16_t _remote_port, + uint16_t _local_port, uint16_t _remote_port, bool _reliable, client_t _client, bool _start); std::shared_ptr<endpoint> create_server_endpoint(uint16_t _port, @@ -223,6 +227,9 @@ private: void stop_and_delete_client_endpoint(std::shared_ptr<endpoint> _endpoint); void clear_multicast_endpoints(service_t _service, instance_t _instance); + bool is_identifying(client_t _client, service_t _service, + instance_t _instance, bool _reliable); + private: return_code_e check_error(const byte_t *_data, length_t _size, instance_t _instance); @@ -244,6 +251,23 @@ private: client_t _client, const std::shared_ptr<endpoint_definition> &_target); + void log_version_timer_cbk(boost::system::error_code const & _error); + + void clear_remote_service_info(service_t _service, instance_t _instance, bool _reliable); + + bool handle_local_offer_service(client_t _client, service_t _service, + instance_t _instance, major_version_t _major,minor_version_t _minor); + + void remove_specific_client_endpoint(client_t _client, service_t _service, instance_t _instance, bool _reliable); + + void clear_identified_clients( service_t _service, instance_t _instance); + + void clear_identifying_clients( service_t _service, instance_t _instance); + + void remove_identified_client(service_t _service, instance_t _instance, client_t _client); + + void remove_identifying_client(service_t _service, instance_t _instance, client_t _client); + std::shared_ptr<routing_manager_stub> stub_; std::shared_ptr<sd::service_discovery> discovery_; @@ -272,17 +296,36 @@ private: std::mutex identified_clients_mutex_; std::mutex requested_services_mutex_; + std::mutex remote_subscribers_mutex_; std::map<service_t, std::map<instance_t, std::map<client_t, std::set<std::shared_ptr<endpoint_definition>>>>> remote_subscribers_; std::mutex specific_endpoint_clients_mutex_; std::map<service_t, std::map<instance_t, std::unordered_set<client_t>>>specific_endpoint_clients_; std::map<service_t, std::map<instance_t, - std::map<bool, std::unordered_set<client_t> > > >identified_clients_; + std::map<bool, std::unordered_set<client_t> > > > identified_clients_; + std::map<service_t, std::map<instance_t, + std::map<bool, std::unordered_set<client_t> > > > identifying_clients_; std::shared_ptr<serviceinfo> sd_info_; std::map<bool, std::set<uint16_t>> used_client_ports_; + + boost::asio::steady_timer version_log_timer_; + +#ifndef WITHOUT_SYSTEMD + boost::asio::steady_timer watchdog_timer_; + void watchdog_cbk(boost::system::error_code const &_error); +#endif + + std::mutex pending_offers_mutex_; + // map to store pending offers. + // 1st client id in tuple: client id of new offering application + // 2nd client id in tuple: client id of previously/stored offering application + std::map<service_t, + std::map<instance_t, + std::tuple<major_version_t, minor_version_t, + client_t, client_t>>> pending_offers_; }; } // namespace vsomeip |