diff options
Diffstat (limited to 'implementation/routing/include')
7 files changed, 112 insertions, 87 deletions
diff --git a/implementation/routing/include/event.hpp b/implementation/routing/include/event.hpp index e127807..93eee34 100644 --- a/implementation/routing/include/event.hpp +++ b/implementation/routing/include/event.hpp @@ -77,13 +77,20 @@ public: // SIP_RPC_359 (epsilon change) void set_epsilon_change_function(const epsilon_change_func_t &_epsilon_change_func); - const std::set<eventgroup_t> & get_eventgroups() const; + const std::set<eventgroup_t> get_eventgroups() const; + std::set<eventgroup_t> get_eventgroups(client_t _client) const; void add_eventgroup(eventgroup_t _eventgroup); void set_eventgroups(const std::set<eventgroup_t> &_eventgroups); void notify_one(const std::shared_ptr<endpoint_definition> &_target, bool _flush); void notify_one(client_t _client, bool _flush); + bool add_subscriber(eventgroup_t _eventgroup, client_t _client); + void remove_subscriber(eventgroup_t _eventgroup, client_t _client); + bool has_subscriber(eventgroup_t _eventgroup, client_t _client); + std::set<client_t> get_subscribers(); + void clear_subscribers(); + void add_ref(client_t _client, bool _is_provided); void remove_ref(client_t _client, bool _is_provided); bool has_ref(); @@ -96,6 +103,8 @@ public: bool has_ref(client_t _client, bool _is_provided); + std::set<client_t> get_subscribers(eventgroup_t _eventgroup); + private: void update_cbk(boost::system::error_code const &_error); void notify(bool _flush); @@ -120,11 +129,10 @@ private: std::chrono::milliseconds cycle_; std::atomic<bool> change_resets_cycle_; - std::atomic<bool> is_updating_on_change_; - std::mutex eventgroups_mutex_; - std::set<eventgroup_t> eventgroups_; + mutable std::mutex eventgroups_mutex_; + std::map<eventgroup_t, std::set<client_t>> eventgroups_; std::atomic<bool> is_set_; std::atomic<bool> is_provided_; @@ -133,7 +141,6 @@ private: std::map<client_t, std::map<bool, uint32_t>> refs_; std::atomic<bool> is_shadow_; - std::atomic<bool> is_cache_placeholder_; epsilon_change_func_t epsilon_change_func_; diff --git a/implementation/routing/include/routing_manager.hpp b/implementation/routing/include/routing_manager.hpp index 4911701..8c408c3 100644 --- a/implementation/routing/include/routing_manager.hpp +++ b/implementation/routing/include/routing_manager.hpp @@ -52,11 +52,11 @@ public: virtual void subscribe(client_t _client, service_t _service, instance_t _instance, eventgroup_t _eventgroup, - major_version_t _major, + major_version_t _major, event_t _event, subscription_type_e _subscription_type) = 0; virtual void unsubscribe(client_t _client, service_t _service, - instance_t _instance, eventgroup_t _eventgroup) = 0; + instance_t _instance, eventgroup_t _eventgroup, event_t _event) = 0; virtual bool send(client_t _client, std::shared_ptr<message> _message, bool _flush) = 0; @@ -81,7 +81,7 @@ public: virtual void unregister_event(client_t _client, service_t _service, instance_t _instance, event_t _event, bool _is_provided) = 0; - virtual std::shared_ptr<event> get_event(service_t _service, + virtual std::shared_ptr<event> find_event(service_t _service, instance_t _instance, event_t _event) const = 0; virtual std::set<std::shared_ptr<event>> find_events(service_t _service, @@ -99,6 +99,8 @@ public: instance_t _instance, bool _reliable) = 0; virtual void set_routing_state(routing_state_e _routing_state) = 0; + + }; } // namespace vsomeip diff --git a/implementation/routing/include/routing_manager_base.hpp b/implementation/routing/include/routing_manager_base.hpp index d3c0e04..625ed6a 100644 --- a/implementation/routing/include/routing_manager_base.hpp +++ b/implementation/routing/include/routing_manager_base.hpp @@ -73,19 +73,16 @@ public: virtual void unregister_event(client_t _client, service_t _service, instance_t _instance, event_t _event, bool _is_provided); - virtual std::shared_ptr<event> get_event(service_t _service, - instance_t _instance, event_t _event) const; - virtual std::set<std::shared_ptr<event>> find_events(service_t _service, instance_t _instance, eventgroup_t _eventgroup) const; virtual void subscribe(client_t _client, service_t _service, instance_t _instance, eventgroup_t _eventgroup, - major_version_t _major, + major_version_t _major, event_t _event, subscription_type_e _subscription_type); virtual void unsubscribe(client_t _client, service_t _service, - instance_t _instance, eventgroup_t _eventgroup); + instance_t _instance, eventgroup_t _eventgroup, event_t _event); virtual void notify(service_t _service, instance_t _instance, event_t _event, std::shared_ptr<payload> _payload, @@ -119,6 +116,9 @@ public: virtual void set_routing_state(routing_state_e _routing_state) = 0; + virtual std::shared_ptr<event> find_event(service_t _service, instance_t _instance, + event_t _event) const; + protected: std::shared_ptr<serviceinfo> find_service(service_t _service, instance_t _instance) const; std::shared_ptr<serviceinfo> create_service_info(service_t _service, @@ -139,14 +139,9 @@ protected: std::unordered_set<client_t> get_connected_clients(); - std::shared_ptr<event> find_event(service_t _service, instance_t _instance, - event_t _event) const; std::shared_ptr<eventgroupinfo> find_eventgroup(service_t _service, instance_t _instance, eventgroup_t _eventgroup) const; - std::set<client_t> find_local_clients(service_t _service, - instance_t _instance, eventgroup_t _eventgroup); - void remove_eventgroup_info(service_t _service, instance_t _instance, eventgroup_t _eventgroup); @@ -160,7 +155,7 @@ protected: bool _flush, bool _reliable, uint8_t _command) const; bool insert_subscription(service_t _service, instance_t _instance, - eventgroup_t _eventgroup, client_t _client); + eventgroup_t _eventgroup, event_t _event, client_t _client); std::shared_ptr<deserializer> get_deserializer(); void put_deserializer(std::shared_ptr<deserializer>); @@ -170,9 +165,11 @@ protected: virtual void send_subscribe(client_t _client, service_t _service, instance_t _instance, eventgroup_t _eventgroup, - major_version_t _major, subscription_type_e _subscription_type) = 0; + major_version_t _major, event_t _event, + subscription_type_e _subscription_type) = 0; - void remove_pending_subscription(service_t _service, instance_t _instance); + void remove_pending_subscription(service_t _service, instance_t _instance, + eventgroup_t _eventgroup, event_t _event); void send_pending_notify_ones(service_t _service, instance_t _instance, eventgroup_t _eventgroup, client_t _client); @@ -181,6 +178,10 @@ protected: void unset_all_eventpayloads(service_t _service, instance_t _instance, eventgroup_t _eventgroup); + void notify_one_current_value(client_t _client, service_t _service, + instance_t _instance, + eventgroup_t _eventgroup, event_t _event); + private: std::shared_ptr<endpoint> create_local_unlocked(client_t _client); std::shared_ptr<endpoint> find_local_unlocked(client_t _client); @@ -188,6 +189,9 @@ private: std::set<std::tuple<service_t, instance_t, eventgroup_t>> get_subscriptions(const client_t _client); + virtual bool create_placeholder_event_and_subscribe( + service_t _service, instance_t _instance, eventgroup_t _eventgroup, + event_t _event, client_t _client) = 0; protected: routing_manager_host *host_; boost::asio::io_service &io_; @@ -209,34 +213,37 @@ protected: std::map<service_t, std::map<instance_t, std::map<eventgroup_t, std::shared_ptr<eventgroupinfo> > > > eventgroups_; + // Events (part of one or more eventgroups) mutable std::mutex events_mutex_; std::map<service_t, std::map<instance_t, std::map<event_t, std::shared_ptr<event> > > > events_; - std::mutex eventgroup_clients_mutex_; - std::map<service_t, - std::map<instance_t, std::map<eventgroup_t, std::set<client_t> > > > eventgroup_clients_; #ifdef USE_DLT std::shared_ptr<tc::trace_connector> tc_; #endif - struct eventgroup_data_t { + struct subscription_data_t { service_t service_; instance_t instance_; eventgroup_t eventgroup_; major_version_t major_; + event_t event_; subscription_type_e subscription_type_; - bool operator<(const eventgroup_data_t &_other) const { + bool operator<(const subscription_data_t &_other) const { return (service_ < _other.service_ || (service_ == _other.service_ && instance_ < _other.instance_) || (service_ == _other.service_ && instance_ == _other.instance_ - && eventgroup_ < _other.eventgroup_)); + && eventgroup_ < _other.eventgroup_) + || (service_ == _other.service_ + && instance_ == _other.instance_ + && eventgroup_ == _other.eventgroup_ + && event_ < _other.event_)); } }; - std::set<eventgroup_data_t> pending_subscriptions_; + std::set<subscription_data_t> pending_subscriptions_; private: services_t services_; diff --git a/implementation/routing/include/routing_manager_impl.hpp b/implementation/routing/include/routing_manager_impl.hpp index d29a3d7..2992c9e 100644 --- a/implementation/routing/include/routing_manager_impl.hpp +++ b/implementation/routing/include/routing_manager_impl.hpp @@ -74,11 +74,11 @@ public: instance_t _instance); void subscribe(client_t _client, service_t _service, instance_t _instance, - eventgroup_t _eventgroup, major_version_t _major, + eventgroup_t _eventgroup, major_version_t _major, event_t _event, subscription_type_e _subscription_type); void unsubscribe(client_t _client, service_t _service, instance_t _instance, - eventgroup_t _eventgroup); + eventgroup_t _eventgroup, event_t _event); bool send(client_t _client, std::shared_ptr<message> _message, bool _flush); @@ -110,18 +110,15 @@ public: instance_t _instance, event_t _event, bool _is_provided); - void notify(service_t _service, instance_t _instance, event_t _event, - std::shared_ptr<payload> _payload, bool _force, bool _flush); - void notify_one(service_t _service, instance_t _instance, event_t _event, std::shared_ptr<payload> _payload, client_t _client, bool _force, bool _flush); void on_subscribe_nack(client_t _client, service_t _service, - instance_t _instance, eventgroup_t _eventgroup); + instance_t _instance, eventgroup_t _eventgroup, event_t _event); void on_subscribe_ack(client_t _client, service_t _service, - instance_t _instance, eventgroup_t _eventgroup); + instance_t _instance, eventgroup_t _eventgroup, event_t _event); void on_identify_response(client_t _client, service_t _service, instance_t _instance, bool _reliable); @@ -134,6 +131,7 @@ public: client_t _client) { return routing_manager_base::find_or_create_local(_client); } + void remove_local(client_t _client); void on_stop_offer_service(client_t _client, service_t _service, instance_t _instance, major_version_t _major, minor_version_t _minor); @@ -253,6 +251,8 @@ private: bool is_identifying(client_t _client, service_t _service, instance_t _instance, bool _reliable); + std::set<eventgroup_t> get_subscribed_eventgroups(service_t _service, + instance_t _instance); private: return_code_e check_error(const byte_t *_data, length_t _size, instance_t _instance); @@ -305,7 +305,8 @@ private: void send_subscribe(client_t _client, service_t _service, instance_t _instance, eventgroup_t _eventgroup, - major_version_t _major, subscription_type_e _subscription_type); + major_version_t _major, event_t _event, + subscription_type_e _subscription_type); void on_net_if_state_changed(std::string _if, bool _available); @@ -317,6 +318,15 @@ private: void requested_service_remove(client_t _client, service_t _service, instance_t _instance); + void call_sd_reliable_endpoint_connected(service_t _service, instance_t _instance, + std::shared_ptr<endpoint> _endpoint); + + bool create_placeholder_event_and_subscribe(service_t _service, + instance_t _instance, + eventgroup_t _eventgroup, + event_t _event, + client_t _client); + std::shared_ptr<routing_manager_stub> stub_; std::shared_ptr<sd::service_discovery> discovery_; diff --git a/implementation/routing/include/routing_manager_proxy.hpp b/implementation/routing/include/routing_manager_proxy.hpp index 27c071d..e725913 100644 --- a/implementation/routing/include/routing_manager_proxy.hpp +++ b/implementation/routing/include/routing_manager_proxy.hpp @@ -9,6 +9,7 @@ #include <map> #include <mutex> #include <atomic> +#include <tuple> #include <boost/asio/io_service.hpp> #include <boost/asio/steady_timer.hpp> @@ -51,11 +52,11 @@ public: instance_t _instance); void subscribe(client_t _client, service_t _service, instance_t _instance, - eventgroup_t _eventgroup, major_version_t _major, + eventgroup_t _eventgroup, major_version_t _major, event_t _event, subscription_type_e _subscription_type); void unsubscribe(client_t _client, service_t _service, instance_t _instance, - eventgroup_t _eventgroup); + eventgroup_t _eventgroup, event_t _event); bool send(client_t _client, const byte_t *_data, uint32_t _size, instance_t _instance, bool _flush = true, bool _reliable = false); @@ -113,13 +114,14 @@ private: bool _is_field, bool _is_provided); void send_subscribe(client_t _client, service_t _service, instance_t _instance, eventgroup_t _eventgroup, - major_version_t _major, subscription_type_e _subscription_type); + major_version_t _major, event_t _event, + subscription_type_e _subscription_type); void send_subscribe_nack(client_t _subscriber, service_t _service, - instance_t _instance, eventgroup_t _eventgroup); + instance_t _instance, eventgroup_t _eventgroup, event_t _event); void send_subscribe_ack(client_t _subscriber, service_t _service, - instance_t _instance, eventgroup_t _eventgroup); + instance_t _instance, eventgroup_t _eventgroup, event_t _event); bool is_field(service_t _service, instance_t _instance, event_t _event) const; @@ -155,6 +157,12 @@ private: bool is_client_known(client_t _client); + bool create_placeholder_event_and_subscribe(service_t _service, + instance_t _instance, + eventgroup_t _eventgroup, + event_t _event, + client_t _client); + private: enum class inner_state_type_e : std::uint8_t { ST_REGISTERED = 0x0, @@ -197,42 +205,16 @@ private: std::set<eventgroup_t> eventgroups_; bool operator<(const event_data_t &_other) const { - if (service_ < _other.service_) { - return true; - } - if (service_ == _other.service_ && instance_ < _other.instance_) { - return true; - } - if (service_ == _other.service_ && instance_ == _other.instance_ - && event_ < _other.event_) { - return true; - } - if (service_ == _other.service_ && instance_ == _other.instance_ - && event_ == _other.event_ - && is_provided_ != _other.is_provided_) { - return true; - } - if (service_ == _other.service_ - && instance_ == _other.instance_ - && event_ == _other.event_ - && is_provided_ == _other.is_provided_ - && is_field_ != _other.is_field_) { - return true; - } - if (service_ == _other.service_ - && instance_ == _other.instance_ - && event_ == _other.event_ - && is_provided_ == _other.is_provided_ - && is_field_ == _other.is_field_ - && eventgroups_ < _other.eventgroups_) { - return true; - } - return false; - } + return std::tie(service_, instance_, event_, is_field_, + is_provided_, eventgroups_) + < std::tie(_other.service_, _other.instance_, _other.event_, + _other.is_field_, _other.is_provided_, + _other.eventgroups_); + } }; std::set<event_data_t> pending_event_registrations_; - std::map<client_t, std::set<eventgroup_data_t>> pending_ingoing_subscripitons_; + std::map<client_t, std::set<subscription_data_t>> pending_ingoing_subscripitons_; std::mutex pending_ingoing_subscripitons_mutex_; std::mutex deserialize_mutex_; diff --git a/implementation/routing/include/routing_manager_stub.hpp b/implementation/routing/include/routing_manager_stub.hpp index 3cf89a7..819352f 100644 --- a/implementation/routing/include/routing_manager_stub.hpp +++ b/implementation/routing/include/routing_manager_stub.hpp @@ -58,20 +58,20 @@ public: instance_t _instance, major_version_t _major, minor_version_t _minor); void send_subscribe(std::shared_ptr<vsomeip::endpoint> _target, - client_t _client, service_t _service, - instance_t _instance, eventgroup_t _eventgroup, - major_version_t _major, bool _is_remote_subscriber); + client_t _client, service_t _service, instance_t _instance, + eventgroup_t _eventgroup, major_version_t _major, + event_t _event, bool _is_remote_subscriber); void send_unsubscribe(std::shared_ptr<vsomeip::endpoint> _target, client_t _client, service_t _service, instance_t _instance, eventgroup_t _eventgroup, - bool _is_remote_subscriber); + event_t _event, bool _is_remote_subscriber); void send_subscribe_nack(client_t _client, service_t _service, - instance_t _instance, eventgroup_t _eventgroup); + instance_t _instance, eventgroup_t _eventgroup, event_t _event); void send_subscribe_ack(client_t _client, service_t _service, - instance_t _instance, eventgroup_t _eventgroup); + instance_t _instance, eventgroup_t _eventgroup, event_t _event); bool contained_in_routing_info(client_t _client, service_t _service, instance_t _instance, major_version_t _major, @@ -82,6 +82,9 @@ public: bool is_registered(client_t _client) const; void deregister_erroneous_client(client_t _client); client_t get_client() const; + void on_request_service(client_t _client, service_t _service, + instance_t _instance, major_version_t _major, + minor_version_t _minor); #ifndef _WIN32 virtual bool check_credentials(client_t _client, uid_t _uid, gid_t _gid); #endif @@ -91,9 +94,18 @@ private: void on_register_application(client_t _client); void on_deregister_application(client_t _client); - void broadcast_routing_info(bool _empty = false, - client_t _ignore = VSOMEIP_ROUTING_CLIENT); - void send_routing_info(client_t _client, bool _empty = false); + void broadcast_routing_stop(); + + void send_routing_info_delta(client_t _target, routing_info_entry_e _entry, + client_t _client, service_t _service = ANY_SERVICE, + instance_t _instance = ANY_INSTANCE, + major_version_t _major = ANY_MAJOR, + minor_version_t _minor = ANY_MINOR); + + void inform_requesters(client_t _hoster, service_t _service, + instance_t _instance, major_version_t _major, + minor_version_t _minor, routing_info_entry_e _entry, + bool _inform_service); void broadcast_ping() const; void on_pong(client_t _client); @@ -109,6 +121,8 @@ private: (void)_routing_state; }; + bool is_already_connected(client_t _source, client_t _sink); + private: routing_manager_stub_host *host_; boost::asio::io_service &io_; @@ -145,6 +159,9 @@ private: boost::asio::steady_timer pinged_clients_timer_; std::mutex pinged_clients_mutex_; std::map<client_t, boost::asio::steady_timer::time_point> pinged_clients_; + + std::map<client_t, std::map<service_t, std::map<instance_t, std::pair<major_version_t, minor_version_t> > > > service_requests_; + std::map<client_t, std::set<client_t>> connection_matrix_; }; } // namespace vsomeip diff --git a/implementation/routing/include/routing_manager_stub_host.hpp b/implementation/routing/include/routing_manager_stub_host.hpp index cb8e83c..1606439 100644 --- a/implementation/routing/include/routing_manager_stub_host.hpp +++ b/implementation/routing/include/routing_manager_stub_host.hpp @@ -40,17 +40,17 @@ public: virtual void subscribe(client_t _client, service_t _service, instance_t _instance, eventgroup_t _eventgroup, - major_version_t _major, + major_version_t _major, event_t _event, subscription_type_e _subscription_type) = 0; virtual void on_subscribe_nack(client_t _client, service_t _service, - instance_t _instance, eventgroup_t _eventgroup) = 0; + instance_t _instance, eventgroup_t _eventgroup, event_t _event) = 0; virtual void on_subscribe_ack(client_t _client, service_t _service, - instance_t _instance, eventgroup_t _eventgroup) = 0; + instance_t _instance, eventgroup_t _eventgroup, event_t _event) = 0; virtual void unsubscribe(client_t _client, service_t _service, - instance_t _instance, eventgroup_t _eventgroup) = 0; + instance_t _instance, eventgroup_t _eventgroup, event_t _event) = 0; virtual void on_message(service_t _service, instance_t _instance, const byte_t *_data, length_t _size, bool _reliable) = 0; |