summaryrefslogtreecommitdiff
path: root/implementation/routing/include/routing_manager_impl.hpp
diff options
context:
space:
mode:
Diffstat (limited to 'implementation/routing/include/routing_manager_impl.hpp')
-rw-r--r--implementation/routing/include/routing_manager_impl.hpp69
1 files changed, 56 insertions, 13 deletions
diff --git a/implementation/routing/include/routing_manager_impl.hpp b/implementation/routing/include/routing_manager_impl.hpp
index 13936a4..b08f2cd 100644
--- a/implementation/routing/include/routing_manager_impl.hpp
+++ b/implementation/routing/include/routing_manager_impl.hpp
@@ -58,7 +58,7 @@ public:
void start();
void stop();
- void offer_service(client_t _client, service_t _service,
+ bool offer_service(client_t _client, service_t _service,
instance_t _instance, major_version_t _major,
minor_version_t _minor);
@@ -82,7 +82,7 @@ public:
bool send(client_t _client, std::shared_ptr<message> _message, bool _flush);
bool send(client_t _client, const byte_t *_data, uint32_t _size,
- instance_t _instance, bool _flush, bool _reliable, bool _initial = false);
+ instance_t _instance, bool _flush, bool _reliable);
bool send_to(const std::shared_ptr<endpoint_definition> &_target,
std::shared_ptr<message> _message);
@@ -103,10 +103,11 @@ public:
bool _is_provided);
void notify(service_t _service, instance_t _instance, event_t _event,
- std::shared_ptr<payload> _payload);
+ std::shared_ptr<payload> _payload, bool _force);
void notify_one(service_t _service, instance_t _instance,
- event_t _event, std::shared_ptr<payload> _payload, client_t _client);
+ event_t _event, std::shared_ptr<payload> _payload,
+ client_t _client, bool _force);
void on_subscribe_nack(client_t _client, service_t _service,
instance_t _instance, eventgroup_t _eventgroup);
@@ -117,15 +118,15 @@ public:
void on_identify_response(client_t _client, service_t _service, instance_t _instance,
bool _reliable);
- bool queue_message(const byte_t *_data, uint32_t _size) const;
-
// interface to stub
std::shared_ptr<endpoint> find_local(client_t _client);
std::shared_ptr<endpoint> find_or_create_local(client_t _client);
void remove_local(client_t _client);
- void on_stop_offer_service(service_t _service, instance_t _instance,
+ void on_stop_offer_service(client_t _client, service_t _service, instance_t _instance,
major_version_t _major, minor_version_t _minor);
+ void on_pong(client_t _client);
+
// interface "endpoint_host"
std::shared_ptr<endpoint> find_or_create_remote_client(service_t _service,
instance_t _instance,
@@ -151,7 +152,7 @@ public:
std::shared_ptr<endpoint> create_service_discovery_endpoint(const std::string &_address,
uint16_t _port, bool _reliable);
void init_routing_info();
- void add_routing_info(service_t _service, instance_t _instance,
+ std::chrono::milliseconds add_routing_info(service_t _service, instance_t _instance,
major_version_t _major, minor_version_t _minor, ttl_t _ttl,
const boost::asio::ip::address &_reliable_address,
uint16_t _reliable_port,
@@ -165,11 +166,11 @@ public:
eventgroup_t _eventgroup,
std::shared_ptr<endpoint_definition> _subscriber,
std::shared_ptr<endpoint_definition> _target,
- const std::chrono::high_resolution_clock::time_point &_expiration);
+ const std::chrono::steady_clock::time_point &_expiration);
bool on_subscribe_accepted(service_t _service, instance_t _instance,
eventgroup_t _eventgroup,
std::shared_ptr<endpoint_definition> _target,
- const std::chrono::high_resolution_clock::time_point &_expiration);
+ const std::chrono::steady_clock::time_point &_expiration);
void on_unsubscribe(service_t _service, instance_t _instance,
eventgroup_t _eventgroup,
std::shared_ptr<endpoint_definition> _target);
@@ -179,11 +180,14 @@ public:
void expire_subscriptions(const boost::asio::ip::address &_address);
void expire_services(const boost::asio::ip::address &_address);
- std::chrono::high_resolution_clock::time_point expire_subscriptions();
+ std::chrono::steady_clock::time_point expire_subscriptions();
bool has_identified(client_t _client, service_t _service,
instance_t _instance, bool _reliable);
+ void on_clientendpoint_error(client_t _client);
+ void confirm_pending_offers(client_t _client);
+
private:
bool deliver_message(const byte_t *_data, length_t _length,
instance_t _instance, bool _reliable);
@@ -197,7 +201,7 @@ private:
std::shared_ptr<endpoint> create_client_endpoint(
const boost::asio::ip::address &_address,
- uint16_t _local_port, uint16_t _remote_port,
+ uint16_t _local_port, uint16_t _remote_port,
bool _reliable, client_t _client, bool _start);
std::shared_ptr<endpoint> create_server_endpoint(uint16_t _port,
@@ -223,6 +227,9 @@ private:
void stop_and_delete_client_endpoint(std::shared_ptr<endpoint> _endpoint);
void clear_multicast_endpoints(service_t _service, instance_t _instance);
+ bool is_identifying(client_t _client, service_t _service,
+ instance_t _instance, bool _reliable);
+
private:
return_code_e check_error(const byte_t *_data, length_t _size,
instance_t _instance);
@@ -244,6 +251,23 @@ private:
client_t _client,
const std::shared_ptr<endpoint_definition> &_target);
+ void log_version_timer_cbk(boost::system::error_code const & _error);
+
+ void clear_remote_service_info(service_t _service, instance_t _instance, bool _reliable);
+
+ bool handle_local_offer_service(client_t _client, service_t _service,
+ instance_t _instance, major_version_t _major,minor_version_t _minor);
+
+ void remove_specific_client_endpoint(client_t _client, service_t _service, instance_t _instance, bool _reliable);
+
+ void clear_identified_clients( service_t _service, instance_t _instance);
+
+ void clear_identifying_clients( service_t _service, instance_t _instance);
+
+ void remove_identified_client(service_t _service, instance_t _instance, client_t _client);
+
+ void remove_identifying_client(service_t _service, instance_t _instance, client_t _client);
+
std::shared_ptr<routing_manager_stub> stub_;
std::shared_ptr<sd::service_discovery> discovery_;
@@ -272,17 +296,36 @@ private:
std::mutex identified_clients_mutex_;
std::mutex requested_services_mutex_;
+ std::mutex remote_subscribers_mutex_;
std::map<service_t, std::map<instance_t, std::map<client_t,
std::set<std::shared_ptr<endpoint_definition>>>>> remote_subscribers_;
std::mutex specific_endpoint_clients_mutex_;
std::map<service_t, std::map<instance_t, std::unordered_set<client_t>>>specific_endpoint_clients_;
std::map<service_t, std::map<instance_t,
- std::map<bool, std::unordered_set<client_t> > > >identified_clients_;
+ std::map<bool, std::unordered_set<client_t> > > > identified_clients_;
+ std::map<service_t, std::map<instance_t,
+ std::map<bool, std::unordered_set<client_t> > > > identifying_clients_;
std::shared_ptr<serviceinfo> sd_info_;
std::map<bool, std::set<uint16_t>> used_client_ports_;
+
+ boost::asio::steady_timer version_log_timer_;
+
+#ifndef WITHOUT_SYSTEMD
+ boost::asio::steady_timer watchdog_timer_;
+ void watchdog_cbk(boost::system::error_code const &_error);
+#endif
+
+ std::mutex pending_offers_mutex_;
+ // map to store pending offers.
+ // 1st client id in tuple: client id of new offering application
+ // 2nd client id in tuple: client id of previously/stored offering application
+ std::map<service_t,
+ std::map<instance_t,
+ std::tuple<major_version_t, minor_version_t,
+ client_t, client_t>>> pending_offers_;
};
} // namespace vsomeip