diff options
author | Juergen Gehring <juergen.gehring@bmw.de> | 2018-01-25 00:40:08 -0800 |
---|---|---|
committer | Juergen Gehring <juergen.gehring@bmw.de> | 2018-01-25 00:40:08 -0800 |
commit | 565b97b0108a02ef41284629a6d226c053c0dd7e (patch) | |
tree | 5a0b28ba67e3ac8d4cf349ea3a464d8cfd4a5ac1 /implementation | |
parent | 8936891b5db1a0c894a3ec0af52c081b52cca46c (diff) | |
download | vSomeIP-565b97b0108a02ef41284629a6d226c053c0dd7e.tar.gz |
vsomeip 2.10.82.10.8
Diffstat (limited to 'implementation')
13 files changed, 638 insertions, 340 deletions
diff --git a/implementation/configuration/include/policy.hpp b/implementation/configuration/include/policy.hpp index e94d7b5..89e8a96 100644 --- a/implementation/configuration/include/policy.hpp +++ b/implementation/configuration/include/policy.hpp @@ -14,12 +14,18 @@ namespace vsomeip { namespace cfg { struct policy { + policy() : + uid_(0), is_uid_set_(false), gid_(0), is_gid_set_(false) { + } + std::set<std::pair<service_t, instance_t>> allowed_services_; std::set<std::pair<service_t, instance_t>> allowed_offers_; std::set<std::pair<service_t, instance_t>> denied_services_; std::set<std::pair<service_t, instance_t>> denied_offers_; std::uint32_t uid_; + bool is_uid_set_; std::uint32_t gid_; + bool is_gid_set_; bool allow_; }; diff --git a/implementation/configuration/src/configuration_impl.cpp b/implementation/configuration/src/configuration_impl.cpp index 0920b66..178481c 100644 --- a/implementation/configuration/src/configuration_impl.cpp +++ b/implementation/configuration/src/configuration_impl.cpp @@ -1667,7 +1667,7 @@ void configuration_impl::load_policy(const boost::property_tree::ptree &_tree) { } } if (overrides) { - VSOMEIP_WARNING << std::hex << "Security configuration: " + VSOMEIP_INFO << std::hex << "Security configuration: " << "Client range 0x" << firstClient << " - 0x" << lastClient << " overrides policy of " << std::dec << overrides << " clients"; @@ -1684,31 +1684,33 @@ void configuration_impl::load_policy(const boost::property_tree::ptree &_tree) { its_converter >> client; if (client != 0x0) { if (policies_.find(client) != policies_.end()) { - VSOMEIP_WARNING << std::hex << "Security configuration: " + VSOMEIP_INFO << std::hex << "Security configuration: " << "Overriding policy for client 0x" << client << "."; } policies_[client] = policy; } } } else if (i->first == "credentials") { - uint32_t uid = 0x0; - uint32_t gid = 0x0; for (auto n = i->second.begin(); n != i->second.end(); ++n) { if (n->first == "uid") { std::stringstream its_converter; std::string value = n->second.data(); - its_converter << std::dec << value; - its_converter >> uid; + if (value != "any") { + its_converter << std::dec << value; + its_converter >> policy->uid_ ; + policy->is_uid_set_ = true; + } } else if (n->first == "gid") { std::stringstream its_converter; std::string value = n->second.data(); - its_converter << std::dec << value; - its_converter >> gid; + if (value != "any") { + its_converter << std::dec << value; + its_converter >> policy->gid_ ; + policy->is_gid_set_ = true; + } } } - policy->uid_ = uid; - policy->gid_ = gid; } else if (i->first == "allow") { if (allow_deny_set) { VSOMEIP_WARNING << "Security configuration: \"allow\" tag overrides " @@ -2410,7 +2412,8 @@ bool configuration_impl::check_credentials(client_t _client, uint32_t _uid, } auto its_client = policies_.find(_client); if (its_client != policies_.end()) { - if (its_client->second->uid_ == _uid && its_client->second->gid_ == _gid) { + if ((!its_client->second->is_uid_set_ || its_client->second->uid_ == _uid) + && (!its_client->second->is_gid_set_ || its_client->second->gid_ == _gid)) { return true; } } diff --git a/implementation/endpoints/include/client_endpoint_impl.hpp b/implementation/endpoints/include/client_endpoint_impl.hpp index d8c758d..b72ea7e 100644 --- a/implementation/endpoints/include/client_endpoint_impl.hpp +++ b/implementation/endpoints/include/client_endpoint_impl.hpp @@ -70,6 +70,11 @@ public: virtual void print_status() = 0;
protected:
+ enum class cei_state_e : std::uint8_t {
+ CLOSED,
+ CONNECTING,
+ ESTABLISHED
+ };
virtual void send_queued() = 0;
void shutdown_and_close_socket(bool _recreate_socket);
void shutdown_and_close_socket_unlocked(bool _recreate_socket);
@@ -84,7 +89,7 @@ protected: std::mutex connect_timer_mutex_;
boost::asio::steady_timer connect_timer_;
std::atomic<uint32_t> connect_timeout_;
- std::atomic<bool> is_connected_;
+ std::atomic<cei_state_e> state_;
// send data
message_buffer_ptr_t packetizer_;
diff --git a/implementation/endpoints/src/client_endpoint_impl.cpp b/implementation/endpoints/src/client_endpoint_impl.cpp index 55316a9..be8e684 100644 --- a/implementation/endpoints/src/client_endpoint_impl.cpp +++ b/implementation/endpoints/src/client_endpoint_impl.cpp @@ -37,7 +37,7 @@ client_endpoint_impl<Protocol>::client_endpoint_impl( socket_(new socket_type(_io)), remote_(_remote),
flush_timer_(_io), connect_timer_(_io),
connect_timeout_(VSOMEIP_DEFAULT_CONNECT_TIMEOUT), // TODO: use config variable
- is_connected_(false),
+ state_(cei_state_e::CLOSED),
packetizer_(std::make_shared<message_buffer_t>()),
queue_size_(0),
was_not_connected_(false),
@@ -55,11 +55,16 @@ bool client_endpoint_impl<Protocol>::is_client() const { template<typename Protocol>
bool client_endpoint_impl<Protocol>::is_connected() const {
- return is_connected_;
+ return state_ == cei_state_e::ESTABLISHED;
}
template<typename Protocol>
-void client_endpoint_impl<Protocol>::set_connected(bool _connected) {
is_connected_ = _connected;
}
+void client_endpoint_impl<Protocol>::set_connected(bool _connected) {
+ if (_connected) {
+ state_ = cei_state_e::ESTABLISHED;
+ } else {
+ state_ = cei_state_e::CLOSED;
+ }
}
template<typename Protocol>
void client_endpoint_impl<Protocol>::stop() {
{
std::lock_guard<std::mutex> its_lock(mutex_);
@@ -222,8 +227,8 @@ void client_endpoint_impl<Protocol>::connect_cbk( if (connect_timeout_ < VSOMEIP_MAX_CONNECT_TIMEOUT)
connect_timeout_ = (connect_timeout_ << 1);
- if (is_connected_) {
- is_connected_ = false;
+ if (state_ != cei_state_e::ESTABLISHED) {
+ state_ = cei_state_e::CLOSED;
its_host->on_disconnect(this->shared_from_this());
}
} else {
@@ -233,7 +238,7 @@ void client_endpoint_impl<Protocol>::connect_cbk( }
connect_timeout_ = VSOMEIP_DEFAULT_CONNECT_TIMEOUT; // TODO: use config variable
set_local_port();
- if (!is_connected_) {
+ if (state_ != cei_state_e::ESTABLISHED) {
its_host->on_connect(this->shared_from_this());
}
@@ -270,7 +275,7 @@ void client_endpoint_impl<Protocol>::send_cbk( send_queued();
}
} else if (_error == boost::asio::error::broken_pipe) {
- is_connected_ = false;
+ state_ = cei_state_e::CLOSED;
bool stopping(false);
{
std::lock_guard<std::mutex> its_lock(mutex_);
@@ -319,6 +324,7 @@ void client_endpoint_impl<Protocol>::send_cbk( connect();
} else if (_error == boost::asio::error::not_connected
|| _error == boost::asio::error::bad_descriptor) {
+ state_ = cei_state_e::CLOSED;
was_not_connected_ = true;
shutdown_and_close_socket(true);
connect();
diff --git a/implementation/endpoints/src/local_client_endpoint_impl.cpp b/implementation/endpoints/src/local_client_endpoint_impl.cpp index 009ac9f..b84cb0a 100644 --- a/implementation/endpoints/src/local_client_endpoint_impl.cpp +++ b/implementation/endpoints/src/local_client_endpoint_impl.cpp @@ -46,7 +46,10 @@ bool local_client_endpoint_impl::is_local() const { }
void local_client_endpoint_impl::restart() {
- is_connected_ = false;
+ if (state_ == cei_state_e::CONNECTING) {
+ return;
+ }
+ state_ = cei_state_e::CONNECTING;
{
std::lock_guard<std::mutex> its_lock(mutex_);
sending_blocked_ = false;
@@ -113,6 +116,7 @@ void local_client_endpoint_impl::connect() { VSOMEIP_WARNING << "local_client_endpoint_impl::connect: "
<< "couldn't enable SO_REUSEADDR: " << its_error.message();
}
+ state_ = cei_state_e::CONNECTING;
socket_->connect(remote_, its_connect_error);
// Credentials
diff --git a/implementation/endpoints/src/tcp_client_endpoint_impl.cpp b/implementation/endpoints/src/tcp_client_endpoint_impl.cpp index 143b95a..b1a2f74 100644 --- a/implementation/endpoints/src/tcp_client_endpoint_impl.cpp +++ b/implementation/endpoints/src/tcp_client_endpoint_impl.cpp @@ -61,7 +61,10 @@ void tcp_client_endpoint_impl::start() { }
void tcp_client_endpoint_impl::restart() {
- is_connected_ = false;
+ if (state_ == cei_state_e::CONNECTING) {
+ return;
+ }
+ state_ = cei_state_e::CONNECTING;
std::string address_port_local;
{
std::lock_guard<std::mutex> its_lock(socket_mutex_);
@@ -137,7 +140,7 @@ void tcp_client_endpoint_impl::connect() { "Error binding socket: " << its_bind_error.message();
}
}
-
+ state_ = cei_state_e::CONNECTING;
socket_->async_connect(
remote_,
std::bind(
@@ -180,7 +183,6 @@ void tcp_client_endpoint_impl::receive(message_buffer_ptr_t _recv_buffer, _recv_buffer->resize(its_required_capacity, 0x0);
}
buffer_size = _missing_capacity;
- _missing_capacity = 0;
} else if (buffer_shrink_threshold_
&& shrink_count_ > buffer_shrink_threshold_
&& _recv_buffer_size == 0) {
@@ -517,7 +519,7 @@ void tcp_client_endpoint_impl::receive_cbk( << _error.message() << "( " << std::dec << _error.value()
<< ") local: " << get_address_port_local()
<< " remote: " << get_address_port_remote();
- is_connected_ = false;
+ state_ = cei_state_e::CLOSED;
shutdown_and_close_socket_unlocked(false);
} else {
VSOMEIP_WARNING << "tcp_client_endpoint receive_cbk: "
diff --git a/implementation/endpoints/src/udp_client_endpoint_impl.cpp b/implementation/endpoints/src/udp_client_endpoint_impl.cpp index 6bc0bf5..6252b40 100644 --- a/implementation/endpoints/src/udp_client_endpoint_impl.cpp +++ b/implementation/endpoints/src/udp_client_endpoint_impl.cpp @@ -51,7 +51,7 @@ void udp_client_endpoint_impl::connect() { "Error binding socket: " << its_bind_error.message();
}
}
-
+ state_ = cei_state_e::CONNECTING;
socket_->async_connect(
remote_,
std::bind(
@@ -77,7 +77,10 @@ void udp_client_endpoint_impl::start() { }
void udp_client_endpoint_impl::restart() {
- is_connected_ = false;
+ if (state_ == cei_state_e::CONNECTING) {
+ return;
+ }
+ state_ = cei_state_e::CONNECTING;
{
std::lock_guard<std::mutex> its_lock(mutex_);
queue_.clear();
diff --git a/implementation/routing/src/routing_manager_impl.cpp b/implementation/routing/src/routing_manager_impl.cpp index 0c443fd..2acd4eb 100644 --- a/implementation/routing/src/routing_manager_impl.cpp +++ b/implementation/routing/src/routing_manager_impl.cpp @@ -3204,32 +3204,14 @@ void routing_manager_impl::on_identify_response(client_t _client, service_t _ser void routing_manager_impl::identify_for_subscribe(client_t _client, service_t _service, instance_t _instance, major_version_t _major, subscription_type_e _subscription_type) { - if (_subscription_type == subscription_type_e::SU_RELIABLE_AND_UNRELIABLE - || _subscription_type == subscription_type_e::SU_PREFER_UNRELIABLE - || _subscription_type == subscription_type_e::SU_UNRELIABLE) { - if (!has_identified(_client, _service, _instance, false) - && !is_identifying(_client, _service, _instance, false)) { - if (!send_identify_message(_client, _service, _instance, _major, - false) && _subscription_type - == subscription_type_e::SU_PREFER_UNRELIABLE) { - send_identify_message(_client, _service, _instance, _major, - true); - } - } + (void)_subscription_type; + if (!has_identified(_client, _service, _instance, false) + && !is_identifying(_client, _service, _instance, false)) { + send_identify_message(_client, _service, _instance, _major, false); } - - if (_subscription_type == subscription_type_e::SU_RELIABLE_AND_UNRELIABLE - || _subscription_type == subscription_type_e::SU_PREFER_RELIABLE - || _subscription_type == subscription_type_e::SU_RELIABLE) { - if (!has_identified(_client, _service, _instance, true) - && !is_identifying(_client, _service, _instance, true)) { - if (!send_identify_message(_client, _service, _instance, _major, - true) && _subscription_type - == subscription_type_e::SU_PREFER_RELIABLE) { - send_identify_message(_client, _service, _instance, _major, - false); - } - } + if (!has_identified(_client, _service, _instance, true) + && !is_identifying(_client, _service, _instance, true)) { + send_identify_message(_client, _service, _instance, _major, true); } } diff --git a/implementation/routing/src/routing_manager_proxy.cpp b/implementation/routing/src/routing_manager_proxy.cpp index a15ae2d..f6aa420 100644 --- a/implementation/routing/src/routing_manager_proxy.cpp +++ b/implementation/routing/src/routing_manager_proxy.cpp @@ -363,12 +363,15 @@ void routing_manager_proxy::register_event(client_t _client, _event, _eventgroups, _is_field, _is_provided); } } - VSOMEIP_INFO << "REGISTER EVENT(" - << std::hex << std::setw(4) << std::setfill('0') << _client << "): [" - << std::hex << std::setw(4) << std::setfill('0') << _service << "." - << std::hex << std::setw(4) << std::setfill('0') << _instance << "." - << std::hex << std::setw(4) << std::setfill('0') << _event - << ":is_provider=" << _is_provided << "]"; + + if(_is_provided) { + VSOMEIP_INFO << "REGISTER EVENT(" + << std::hex << std::setw(4) << std::setfill('0') << _client << "): [" + << std::hex << std::setw(4) << std::setfill('0') << _service << "." + << std::hex << std::setw(4) << std::setfill('0') << _instance << "." + << std::hex << std::setw(4) << std::setfill('0') << _event + << ":is_provider=" << _is_provided << "]"; + } } void routing_manager_proxy::unregister_event(client_t _client, diff --git a/implementation/runtime/include/application_impl.hpp b/implementation/runtime/include/application_impl.hpp index 2d6cd82..efd7d7d 100644 --- a/implementation/runtime/include/application_impl.hpp +++ b/implementation/runtime/include/application_impl.hpp @@ -203,7 +203,6 @@ private: sync_handler(std::function<void()> _handler) : handler_(_handler), - is_dispatching_(false), service_id_(ANY_SERVICE), instance_id_(ANY_INSTANCE), method_id_(ANY_METHOD), @@ -215,7 +214,6 @@ private: method_t _method_id, session_t _session_id, eventgroup_t _eventgroup_id, handler_type_e _handler_type) : handler_(nullptr), - is_dispatching_(false), service_id_(_service_id), instance_id_(_instance_id), method_id_(_method_id), @@ -224,7 +222,6 @@ private: handler_type_(_handler_type) { } std::function<void()> handler_; - bool is_dispatching_; service_t service_id_; instance_t instance_id_; method_t method_id_; @@ -268,6 +265,8 @@ private: void main_dispatch(); void dispatch(); void invoke_handler(std::shared_ptr<sync_handler> &_handler); + std::shared_ptr<sync_handler> get_next_handler(); + void reschedule_availability_handler(const std::shared_ptr<sync_handler> &_handler); bool has_active_dispatcher(); bool is_active_dispatcher(const std::thread::id &_id); void remove_elapsed_dispatchers(); @@ -372,6 +371,7 @@ private: std::set<std::thread::id> running_dispatchers_; // Mutex to protect access to dispatchers_ & elapsed_dispatchers_ std::mutex dispatcher_mutex_; + // Condition to wakeup the dispatcher thread mutable std::condition_variable dispatcher_condition_; std::size_t max_dispatchers_; @@ -419,6 +419,9 @@ private: bool client_side_logging_; std::set<std::tuple<service_t, instance_t> > client_side_logging_filter_; + + std::map<std::pair<service_t, instance_t>, + std::deque<std::shared_ptr<sync_handler> > > availability_handlers_; }; } // namespace vsomeip diff --git a/implementation/runtime/src/application_impl.cpp b/implementation/runtime/src/application_impl.cpp index d97be8e..720ddb1 100644 --- a/implementation/runtime/src/application_impl.cpp +++ b/implementation/runtime/src/application_impl.cpp @@ -1557,13 +1557,18 @@ void application_impl::main_dispatch() { dispatcher_condition_.wait(its_lock); } } else { - while (is_dispatching_ && !handlers_.empty() && is_active_dispatcher(its_id)) { - std::shared_ptr<sync_handler> its_handler = handlers_.front(); - handlers_.pop_front(); + std::shared_ptr<sync_handler> its_handler; + while (is_dispatching_ && is_active_dispatcher(its_id) + && (its_handler = get_next_handler())) { its_lock.unlock(); invoke_handler(its_handler); + + if (!is_dispatching_) + return; + its_lock.lock(); + reschedule_availability_handler(its_handler); remove_elapsed_dispatchers(); #ifdef _WIN32 @@ -1596,14 +1601,18 @@ void application_impl::dispatch() { return; } } else { - while (is_dispatching_ && !handlers_.empty() - && is_active_dispatcher(its_id)) { - std::shared_ptr<sync_handler> its_handler = handlers_.front(); - handlers_.pop_front(); + std::shared_ptr<sync_handler> its_handler; + while (is_dispatching_ && is_active_dispatcher(its_id) + && (its_handler = get_next_handler())) { its_lock.unlock(); invoke_handler(its_handler); + + if (!is_dispatching_) + return; + its_lock.lock(); + reschedule_availability_handler(its_handler); remove_elapsed_dispatchers(); } } @@ -1616,6 +1625,72 @@ void application_impl::dispatch() { dispatcher_condition_.notify_all(); } +std::shared_ptr<application_impl::sync_handler> application_impl::get_next_handler() { + std::shared_ptr<sync_handler> its_next_handler; + while (!handlers_.empty() && !its_next_handler) { + its_next_handler = handlers_.front(); + handlers_.pop_front(); + + // Check handler + if (its_next_handler->handler_type_ == handler_type_e::AVAILABILITY) { + const std::pair<service_t, instance_t> its_si_pair = std::make_pair( + its_next_handler->service_id_, + its_next_handler->instance_id_); + auto found_si = availability_handlers_.find(its_si_pair); + if (found_si != availability_handlers_.end() + && !found_si->second.empty() + && found_si->second.front() != its_next_handler) { + found_si->second.push_back(its_next_handler); + // There is a running availability handler for this service. + // Therefore, this one must wait... + its_next_handler = nullptr; + } else { + availability_handlers_[its_si_pair].push_back(its_next_handler); + } + } else if (its_next_handler->handler_type_ == handler_type_e::MESSAGE) { + const std::pair<service_t, instance_t> its_si_pair = std::make_pair( + its_next_handler->service_id_, + its_next_handler->instance_id_); + auto found_si = availability_handlers_.find(its_si_pair); + if (found_si != availability_handlers_.end() + && found_si->second.size() > 1) { + // The message comes after the next availability handler + // Therefore, queue it to the last one + found_si->second.push_back(its_next_handler); + its_next_handler = nullptr; + } + } + } + + return its_next_handler; +} + +void application_impl::reschedule_availability_handler( + const std::shared_ptr<sync_handler> &_handler) { + if (_handler->handler_type_ == handler_type_e::AVAILABILITY) { + const std::pair<service_t, instance_t> its_si_pair = std::make_pair( + _handler->service_id_, _handler->instance_id_); + auto found_si = availability_handlers_.find(its_si_pair); + if (found_si != availability_handlers_.end()) { + if (!found_si->second.empty() + && found_si->second.front() == _handler) { + found_si->second.pop_front(); + + // If there are other availability handlers pending, schedule + // them and all handlers that were queued because of them + for (auto it = found_si->second.rbegin(); + it != found_si->second.rend(); it++) { + handlers_.push_front(*it); + } + availability_handlers_.erase(found_si); + } + return; + } + VSOMEIP_WARNING << __func__ + << ": An unknown availability handler returned!"; + } +} + void application_impl::invoke_handler(std::shared_ptr<sync_handler> &_handler) { const std::thread::id its_id = std::this_thread::get_id(); @@ -1669,11 +1744,17 @@ void application_impl::invoke_handler(std::shared_ptr<sync_handler> &_handler) { << "type=" << static_cast<std::uint32_t>(its_sync_handler->handler_type_) << " thread=" << std::hex << its_id; } - if (is_dispatching_) { - { - std::lock_guard<std::mutex> its_lock(dispatcher_mutex_); + + while (is_dispatching_ ) { + if (dispatcher_mutex_.try_lock()) { running_dispatchers_.insert(its_id); + dispatcher_mutex_.unlock(); + break; } + std::this_thread::yield(); + } + + if (is_dispatching_) { try { _handler->handler_(); } catch (const std::exception &e) { @@ -1684,9 +1765,14 @@ void application_impl::invoke_handler(std::shared_ptr<sync_handler> &_handler) { } boost::system::error_code ec; its_dispatcher_timer.cancel(ec); - if (is_dispatching_) { - std::lock_guard<std::mutex> its_lock(dispatcher_mutex_); - running_dispatchers_.erase(its_id); + + while (is_dispatching_ ) { + if (dispatcher_mutex_.try_lock()) { + running_dispatchers_.erase(its_id); + dispatcher_mutex_.unlock(); + return; + } + std::this_thread::yield(); } } @@ -1801,6 +1887,7 @@ void application_impl::shutdown() { its_dispatcher.second->detach(); } } + availability_handlers_.clear(); running_dispatchers_.clear(); elapsed_dispatchers_.clear(); dispatchers_.clear(); @@ -2033,13 +2120,13 @@ bool application_impl::check_subscription_state(service_t _service, instance_t _ void application_impl::print_blocking_call(std::shared_ptr<sync_handler> _handler) { switch (_handler->handler_type_) { case handler_type_e::AVAILABILITY: - VSOMEIP_INFO << "BLOCKING CALL AVAILABILITY(" + VSOMEIP_WARNING << "BLOCKING CALL AVAILABILITY(" << std::hex << std::setw(4) << std::setfill('0') << get_client() <<"): [" << std::hex << std::setw(4) << std::setfill('0') << _handler->service_id_ << "." << std::hex << std::setw(4) << std::setfill('0') << _handler->instance_id_ << "]"; break; case handler_type_e::MESSAGE: - VSOMEIP_INFO << "BLOCKING CALL MESSAGE(" + VSOMEIP_WARNING << "BLOCKING CALL MESSAGE(" << std::hex << std::setw(4) << std::setfill('0') << get_client() <<"): [" << std::hex << std::setw(4) << std::setfill('0') << _handler->service_id_ << "." << std::hex << std::setw(4) << std::setfill('0') << _handler->instance_id_ << "." @@ -2047,11 +2134,11 @@ void application_impl::print_blocking_call(std::shared_ptr<sync_handler> _handle << std::hex << std::setw(4) << std::setfill('0') << _handler->session_id_ << "]"; break; case handler_type_e::STATE: - VSOMEIP_INFO << "BLOCKING CALL STATE(" + VSOMEIP_WARNING << "BLOCKING CALL STATE(" << std::hex << std::setw(4) << std::setfill('0') << get_client() << ")"; break; case handler_type_e::SUBSCRIPTION: - VSOMEIP_INFO << "BLOCKING CALL SUBSCRIPTION(" + VSOMEIP_WARNING << "BLOCKING CALL SUBSCRIPTION(" << std::hex << std::setw(4) << std::setfill('0') << get_client() <<"): [" << std::hex << std::setw(4) << std::setfill('0') << _handler->service_id_ << "." << std::hex << std::setw(4) << std::setfill('0') << _handler->instance_id_ << "." @@ -2059,15 +2146,15 @@ void application_impl::print_blocking_call(std::shared_ptr<sync_handler> _handle << std::hex << std::setw(4) << std::setfill('0') << _handler->method_id_ << "]"; break; case handler_type_e::OFFERED_SERVICES_INFO: - VSOMEIP_INFO << "BLOCKING CALL OFFERED_SERVICES_INFO(" + VSOMEIP_WARNING << "BLOCKING CALL OFFERED_SERVICES_INFO(" << std::hex << std::setw(4) << std::setfill('0') << get_client() <<")"; break; case handler_type_e::WATCHDOG: - VSOMEIP_INFO << "BLOCKING CALL WATCHDOG(" + VSOMEIP_WARNING << "BLOCKING CALL WATCHDOG(" << std::hex << std::setw(4) << std::setfill('0') << get_client() <<")"; break; case handler_type_e::UNKNOWN: - VSOMEIP_INFO << "BLOCKING CALL UNKNOWN(" + VSOMEIP_WARNING << "BLOCKING CALL UNKNOWN(" << std::hex << std::setw(4) << std::setfill('0') << get_client() << ")"; break; } diff --git a/implementation/service_discovery/include/service_discovery_impl.hpp b/implementation/service_discovery/include/service_discovery_impl.hpp index 020de59..039fe7b 100644 --- a/implementation/service_discovery/include/service_discovery_impl.hpp +++ b/implementation/service_discovery/include/service_discovery_impl.hpp @@ -122,12 +122,20 @@ private: service_t _service, instance_t _instance, const std::shared_ptr<const serviceinfo> &_info, uint32_t &_size); - void insert_subscription(std::shared_ptr<message_impl> &_message, + enum remote_offer_type_e : std::uint8_t { + RELIABLE_UNRELIABLE, + RELIABLE, + UNRELIABLE, + UNKNOWN = 0xff + }; + bool insert_subscription(std::shared_ptr<message_impl> &_message, service_t _service, instance_t _instance, eventgroup_t _eventgroup, - std::shared_ptr<subscription> &_subscription, bool _insert_reliable, bool _insert_unreliable); - void insert_nack_subscription_on_resubscribe(std::shared_ptr<message_impl> &_message, + std::shared_ptr<subscription> &_subscription, + remote_offer_type_e _offer_type); + bool insert_nack_subscription_on_resubscribe(std::shared_ptr<message_impl> &_message, service_t _service, instance_t _instance, eventgroup_t _eventgroup, - std::shared_ptr<subscription> &_subscription); + std::shared_ptr<subscription> &_subscription, + remote_offer_type_e _offer_type); void insert_subscription_ack(std::shared_ptr<message_impl> &_message, service_t _service, instance_t _instance, eventgroup_t _eventgroup, const std::shared_ptr<eventgroupinfo> &_info, ttl_t _ttl, @@ -206,8 +214,7 @@ private: const std::shared_ptr<const message> &_message) const; bool check_layer_four_protocol( const std::shared_ptr<const ip_option_impl> _ip_option) const; - void get_subscription_endpoints(subscription_type_e _subscription_type, - std::shared_ptr<endpoint>& _unreliable, + void get_subscription_endpoints(std::shared_ptr<endpoint>& _unreliable, std::shared_ptr<endpoint>& _reliable, boost::asio::ip::address* _address, bool* _has_address, @@ -322,6 +329,16 @@ private: void on_last_msg_received_timer_expired(const boost::system::error_code &_error); void stop_last_msg_received_timer(); + + remote_offer_type_e get_remote_offer_type(service_t _service, instance_t _instance); + bool update_remote_offer_type(service_t _service, instance_t _instance, + remote_offer_type_e _offer_type, + const boost::asio::ip::address &_reliable_address, + const boost::asio::ip::address &_unreliable_address); + void remove_remote_offer_type(service_t _service, instance_t _instance, + const boost::asio::ip::address &_address); + void remove_remote_offer_type_by_ip(const boost::asio::ip::address &_address); + private: boost::asio::io_service &io_; service_discovery_host *host_; @@ -417,6 +434,10 @@ private: std::mutex last_msg_received_timer_mutex_; boost::asio::steady_timer last_msg_received_timer_; std::chrono::milliseconds last_msg_received_timer_timeout_; + + std::mutex remote_offer_types_mutex_; + std::map<std::pair<service_t, instance_t>, remote_offer_type_e> remote_offer_types_; + std::map<boost::asio::ip::address, std::set<std::pair<service_t, instance_t>>> remote_offers_by_ip_; }; } // namespace sd diff --git a/implementation/service_discovery/src/service_discovery_impl.cpp b/implementation/service_discovery/src/service_discovery_impl.cpp index d617e0c..2d5aaff 100644 --- a/implementation/service_discovery/src/service_discovery_impl.cpp +++ b/implementation/service_discovery/src/service_discovery_impl.cpp @@ -258,7 +258,7 @@ void service_discovery_impl::subscribe(service_t _service, instance_t _instance, bool has_address(false); boost::asio::ip::address its_address; - get_subscription_endpoints(_subscription_type, its_unreliable, its_reliable, + get_subscription_endpoints(its_unreliable, its_reliable, &its_address, &has_address, _service, _instance, _client); std::shared_ptr<runtime> its_runtime = runtime_.lock(); @@ -286,41 +286,47 @@ void service_discovery_impl::subscribe(service_t _service, instance_t _instance, } } - if (its_subscription->get_endpoint(true) && + const remote_offer_type_e its_offer_type = get_remote_offer_type( + _service, _instance); + + if (its_offer_type == remote_offer_type_e::UNRELIABLE && + !its_subscription->get_endpoint(true) && its_subscription->get_endpoint(false)) { - if (its_subscription->get_endpoint(true)->is_connected() && - its_subscription->get_endpoint(false)->is_connected()) { + if (its_subscription->get_endpoint(false)->is_connected()) { insert_subscription(its_message, _service, _instance, _eventgroup, - its_subscription, true, true); + its_subscription, its_offer_type); } else { - if (!its_subscription->get_endpoint(true)->is_connected()) { - its_subscription->set_tcp_connection_established(false); - } - if (!its_subscription->get_endpoint(false)->is_connected()) { - its_subscription->set_udp_connection_established(false); - } + its_subscription->set_udp_connection_established(false); } - } else if (its_subscription->get_endpoint(true) && + } else if (its_offer_type == remote_offer_type_e::RELIABLE && + its_subscription->get_endpoint(true) && !its_subscription->get_endpoint(false)) { if (its_subscription->get_endpoint(true)->is_connected()) { insert_subscription(its_message, _service, _instance, _eventgroup, - its_subscription, true, false); + its_subscription, its_offer_type); } else { its_subscription->set_tcp_connection_established(false); } - } else if (!its_subscription->get_endpoint(true) && + } else if (its_offer_type == remote_offer_type_e::RELIABLE_UNRELIABLE && + its_subscription->get_endpoint(true) && its_subscription->get_endpoint(false)) { - if (its_subscription->get_endpoint(false)->is_connected()) { + if (its_subscription->get_endpoint(true)->is_connected() && + its_subscription->get_endpoint(false)->is_connected()) { insert_subscription(its_message, _service, _instance, _eventgroup, - its_subscription, false, true); + its_subscription, its_offer_type); } else { - its_subscription->set_udp_connection_established(false); + if (!its_subscription->get_endpoint(true)->is_connected()) { + its_subscription->set_tcp_connection_established(false); + } + if (!its_subscription->get_endpoint(false)->is_connected()) { + its_subscription->set_udp_connection_established(false); + } } } @@ -336,107 +342,30 @@ void service_discovery_impl::subscribe(service_t _service, instance_t _instance, } void service_discovery_impl::get_subscription_endpoints( - subscription_type_e _subscription_type, std::shared_ptr<endpoint>& _unreliable, std::shared_ptr<endpoint>& _reliable, boost::asio::ip::address* _address, bool* _has_address, service_t _service, instance_t _instance, client_t _client) const { - switch (_subscription_type) { - case subscription_type_e::SU_RELIABLE_AND_UNRELIABLE: - _reliable = host_->find_or_create_remote_client(_service, _instance, - true, _client); - _unreliable = host_->find_or_create_remote_client(_service, - _instance, false, _client); - if (_unreliable) { - std::shared_ptr<client_endpoint> its_client_endpoint = - std::dynamic_pointer_cast<client_endpoint>(_unreliable); - if (its_client_endpoint) { - *_has_address = its_client_endpoint->get_remote_address( + _reliable = host_->find_or_create_remote_client(_service, _instance, + true, _client); + _unreliable = host_->find_or_create_remote_client(_service, + _instance, false, _client); + if (_unreliable) { + std::shared_ptr<client_endpoint> its_client_endpoint = + std::dynamic_pointer_cast<client_endpoint>(_unreliable); + if (its_client_endpoint) { + *_has_address = its_client_endpoint->get_remote_address( + *_address); + } + } + if (_reliable) { + std::shared_ptr<client_endpoint> its_client_endpoint = + std::dynamic_pointer_cast<client_endpoint>(_reliable); + if (its_client_endpoint) { + *_has_address = *_has_address + || its_client_endpoint->get_remote_address( *_address); - } - } - if (_reliable) { - std::shared_ptr<client_endpoint> its_client_endpoint = - std::dynamic_pointer_cast<client_endpoint>(_reliable); - if (its_client_endpoint) { - *_has_address = *_has_address - || its_client_endpoint->get_remote_address( - *_address); - } - } - break; - case subscription_type_e::SU_PREFER_UNRELIABLE: - _unreliable = host_->find_or_create_remote_client(_service, - _instance, false, _client); - if (_unreliable) { - std::shared_ptr<client_endpoint> its_client_endpoint = - std::dynamic_pointer_cast<client_endpoint>(_unreliable); - if (its_client_endpoint) { - *_has_address = its_client_endpoint->get_remote_address( - *_address); - } - } else { - _reliable = host_->find_or_create_remote_client(_service, - _instance, true, _client); - if (_reliable) { - std::shared_ptr<client_endpoint> its_client_endpoint = - std::dynamic_pointer_cast<client_endpoint>( - _reliable); - if (its_client_endpoint) { - *_has_address = its_client_endpoint->get_remote_address( - *_address); - } - } - } - break; - case subscription_type_e::SU_PREFER_RELIABLE: - _reliable = host_->find_or_create_remote_client(_service, - _instance, true, _client); - if (_reliable) { - std::shared_ptr<client_endpoint> its_client_endpoint = - std::dynamic_pointer_cast<client_endpoint>(_reliable); - if (its_client_endpoint) { - *_has_address = its_client_endpoint->get_remote_address( - *_address); - } - } else { - _unreliable = host_->find_or_create_remote_client(_service, - _instance, false, _client); - if (_unreliable) { - std::shared_ptr<client_endpoint> its_client_endpoint = - std::dynamic_pointer_cast<client_endpoint>( - _unreliable); - if (its_client_endpoint) { - *_has_address = its_client_endpoint->get_remote_address( - *_address); - } - } - } - break; - case subscription_type_e::SU_UNRELIABLE: - _unreliable = host_->find_or_create_remote_client(_service, - _instance, - false, _client); - if (_unreliable) { - std::shared_ptr<client_endpoint> its_client_endpoint = - std::dynamic_pointer_cast<client_endpoint>(_unreliable); - if (its_client_endpoint) { - *_has_address = its_client_endpoint->get_remote_address( - *_address); - } - } - break; - case subscription_type_e::SU_RELIABLE: - _reliable = host_->find_or_create_remote_client(_service, _instance, - true, _client); - if (_reliable) { - std::shared_ptr<client_endpoint> its_client_endpoint = - std::dynamic_pointer_cast<client_endpoint>(_reliable); - if (its_client_endpoint) { - *_has_address = its_client_endpoint->get_remote_address( - *_address); - } - } + } } } @@ -456,6 +385,8 @@ void service_discovery_impl::unsubscribe(service_t _service, if (found_service != subscribed_.end()) { auto found_instance = found_service->second.find(_instance); if (found_instance != found_service->second.end()) { + const remote_offer_type_e its_offer_type = get_remote_offer_type( + _service, _instance); auto found_eventgroup = found_instance->second.find(_eventgroup); if (found_eventgroup != found_instance->second.end()) { auto found_client = found_eventgroup->second.find(_client); @@ -488,8 +419,8 @@ void service_discovery_impl::unsubscribe(service_t _service, return; } } - insert_subscription(its_message, _service, _instance, _eventgroup, - its_subscription, true, true); + insert_subscription(its_message, _service, _instance, + _eventgroup, its_subscription, its_offer_type); } } } @@ -535,6 +466,8 @@ void service_discovery_impl::unsubscribe_client(service_t _service, if (found_service != subscribed_.end()) { auto found_instance = found_service->second.find(_instance); if (found_instance != found_service->second.end()) { + const remote_offer_type_e its_offer_type = get_remote_offer_type( + _service, _instance); for (auto &found_eventgroup : found_instance->second) { auto found_client = found_eventgroup.second.find(_client); if (found_client != found_eventgroup.second.end()) { @@ -570,8 +503,7 @@ void service_discovery_impl::unsubscribe_client(service_t _service, } } insert_subscription(its_message, _service, _instance, - found_eventgroup.first, its_subscription, true, - true); + found_eventgroup.first, its_subscription, its_offer_type); } } } @@ -895,117 +827,207 @@ void service_discovery_impl::insert_offer_entries( _done = true; } -void service_discovery_impl::insert_subscription( +bool service_discovery_impl::insert_subscription( std::shared_ptr<message_impl> &_message, service_t _service, instance_t _instance, eventgroup_t _eventgroup, std::shared_ptr<subscription> &_subscription, - bool _insert_reliable, bool _insert_unreliable) { - if((_insert_reliable && !_insert_unreliable && !_subscription->get_endpoint(true)) || - (_insert_unreliable && !_insert_reliable && !_subscription->get_endpoint(false))) { - // don't create an eventgroup entry if there isn't an endpoint option - // to insert - return; - } - std::shared_ptr < eventgroupentry_impl > its_entry = - _message->create_eventgroup_entry(); - its_entry->set_type(entry_type_e::SUBSCRIBE_EVENTGROUP); - its_entry->set_service(_service); - its_entry->set_instance(_instance); - its_entry->set_eventgroup(_eventgroup); - its_entry->set_counter(_subscription->get_counter()); - its_entry->set_major_version(_subscription->get_major()); - its_entry->set_ttl(_subscription->get_ttl()); - std::shared_ptr < endpoint > its_endpoint; - if (_insert_reliable) { - its_endpoint = _subscription->get_endpoint(true); - if (its_endpoint) { - const std::uint16_t its_port = its_endpoint->get_local_port(); - if (its_port) { - insert_option(_message, its_entry, unicast_, its_port, true); - } else { - VSOMEIP_WARNING << __func__ << ": Didn't insert subscription as " - "local reliable port is zero: [" - << std::hex << std::setw(4) << std::setfill('0') << _service << "." - << std::hex << std::setw(4) << std::setfill('0') << _instance << "." - << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]"; + remote_offer_type_e _offer_type) { + bool ret(false); + std::shared_ptr<endpoint> its_reliable_endpoint(_subscription->get_endpoint(true)); + std::shared_ptr<endpoint> its_unreliable_endpoint(_subscription->get_endpoint(false)); + + bool insert_reliable(false); + bool insert_unreliable(false); + switch (_offer_type) { + case remote_offer_type_e::RELIABLE: + if (its_reliable_endpoint) { + insert_reliable = true; } - } - } - if (_insert_unreliable) { - its_endpoint = _subscription->get_endpoint(false); - if (its_endpoint) { - const std::uint16_t its_port = its_endpoint->get_local_port(); - if (its_port) { - insert_option(_message, its_entry, unicast_, its_port, false); - } else { - VSOMEIP_WARNING << __func__ << ": Didn't insert subscription as " - " local unreliable port is zero: [" - << std::hex << std::setw(4) << std::setfill('0') << _service << "." - << std::hex << std::setw(4) << std::setfill('0') << _instance << "." - << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]"; + break; + case remote_offer_type_e::UNRELIABLE: + if (its_unreliable_endpoint) { + insert_unreliable = true; } - } + break; + case remote_offer_type_e::RELIABLE_UNRELIABLE: + if (its_reliable_endpoint && its_unreliable_endpoint) { + insert_reliable = true; + insert_unreliable = true; + } + break; + default: + break; } -} - -void service_discovery_impl::insert_nack_subscription_on_resubscribe(std::shared_ptr<message_impl> &_message, - service_t _service, instance_t _instance, eventgroup_t _eventgroup, - std::shared_ptr<subscription> &_subscription) { - // SIP_SD_844: - // This method is used for not acknowledged subscriptions on renew subscription - // Two entries: Stop subscribe & subscribe within one SD-Message - // One option: Both entries reference it - - std::shared_ptr < eventgroupentry_impl > its_stop_entry = - _message->create_eventgroup_entry(); - its_stop_entry->set_type(entry_type_e::SUBSCRIBE_EVENTGROUP); - its_stop_entry->set_service(_service); - its_stop_entry->set_instance(_instance); - its_stop_entry->set_eventgroup(_eventgroup); - its_stop_entry->set_counter(_subscription->get_counter()); - its_stop_entry->set_major_version(_subscription->get_major()); - its_stop_entry->set_ttl(0); - - std::shared_ptr < eventgroupentry_impl > its_entry = - _message->create_eventgroup_entry(); - its_entry->set_type(entry_type_e::SUBSCRIBE_EVENTGROUP); - its_entry->set_service(_service); - its_entry->set_instance(_instance); - its_entry->set_eventgroup(_eventgroup); - its_entry->set_counter(_subscription->get_counter()); - its_entry->set_major_version(_subscription->get_major()); - its_entry->set_ttl(_subscription->get_ttl()); - - std::shared_ptr < endpoint > its_endpoint; - its_endpoint = _subscription->get_endpoint(true); - if (its_endpoint && its_endpoint->is_connected()) { - const std::uint16_t its_port = its_endpoint->get_local_port(); + if (!insert_reliable && !insert_unreliable) { + VSOMEIP_WARNING << __func__ << ": Didn't insert subscription as " + "subscription doesn't match offer type: [" + << std::hex << std::setw(4) << std::setfill('0') << _service << "." + << std::hex << std::setw(4) << std::setfill('0') << _instance << "." + << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "] " + << _offer_type; + return false; + } + std::shared_ptr<eventgroupentry_impl> its_entry; + if (insert_reliable && its_reliable_endpoint) { + const std::uint16_t its_port = its_reliable_endpoint->get_local_port(); if (its_port) { - insert_option(_message, its_stop_entry, unicast_, its_port, true); + its_entry = _message->create_eventgroup_entry(); + its_entry->set_type(entry_type_e::SUBSCRIBE_EVENTGROUP); + its_entry->set_service(_service); + its_entry->set_instance(_instance); + its_entry->set_eventgroup(_eventgroup); + its_entry->set_counter(_subscription->get_counter()); + its_entry->set_major_version(_subscription->get_major()); + its_entry->set_ttl(_subscription->get_ttl()); insert_option(_message, its_entry, unicast_, its_port, true); + ret = true; } else { VSOMEIP_WARNING << __func__ << ": Didn't insert subscription as " "local reliable port is zero: [" << std::hex << std::setw(4) << std::setfill('0') << _service << "." << std::hex << std::setw(4) << std::setfill('0') << _instance << "." << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]"; + ret = false; } } - its_endpoint = _subscription->get_endpoint(false); - if (its_endpoint) { - const std::uint16_t its_port = its_endpoint->get_local_port(); + if (insert_unreliable && its_unreliable_endpoint) { + const std::uint16_t its_port = its_unreliable_endpoint->get_local_port(); if (its_port) { - insert_option(_message, its_stop_entry, unicast_, its_port, false); + if (!its_entry) { + its_entry = _message->create_eventgroup_entry(); + its_entry->set_type(entry_type_e::SUBSCRIBE_EVENTGROUP); + its_entry->set_service(_service); + its_entry->set_instance(_instance); + its_entry->set_eventgroup(_eventgroup); + its_entry->set_counter(_subscription->get_counter()); + its_entry->set_major_version(_subscription->get_major()); + its_entry->set_ttl(_subscription->get_ttl()); + } insert_option(_message, its_entry, unicast_, its_port, false); + ret = true; } else { VSOMEIP_WARNING << __func__ << ": Didn't insert subscription as " - "local unreliable port is zero: [" + " local unreliable port is zero: [" << std::hex << std::setw(4) << std::setfill('0') << _service << "." << std::hex << std::setw(4) << std::setfill('0') << _instance << "." << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]"; + ret = false; } } + return ret; +} + +bool service_discovery_impl::insert_nack_subscription_on_resubscribe(std::shared_ptr<message_impl> &_message, + service_t _service, instance_t _instance, eventgroup_t _eventgroup, + std::shared_ptr<subscription> &_subscription, remote_offer_type_e _offer_type) { + bool ret(false); + // SIP_SD_844: + // This method is used for not acknowledged subscriptions on renew subscription + // Two entries: Stop subscribe & subscribe within one SD-Message + // One option: Both entries reference it + + const std::function<std::shared_ptr<eventgroupentry_impl>(ttl_t)> insert_entry + = [&](ttl_t _ttl) { + std::shared_ptr<eventgroupentry_impl> its_entry = + _message->create_eventgroup_entry(); + // SUBSCRIBE_EVENTGROUP and STOP_SUBSCRIBE_EVENTGROUP are identical + its_entry->set_type(entry_type_e::SUBSCRIBE_EVENTGROUP); + its_entry->set_service(_service); + its_entry->set_instance(_instance); + its_entry->set_eventgroup(_eventgroup); + its_entry->set_counter(_subscription->get_counter()); + its_entry->set_major_version(_subscription->get_major()); + its_entry->set_ttl(_ttl); + return its_entry; + }; + + std::shared_ptr<endpoint> its_reliable_endpoint(_subscription->get_endpoint(true)); + std::shared_ptr<endpoint> its_unreliable_endpoint(_subscription->get_endpoint(false)); + + if (_offer_type == remote_offer_type_e::UNRELIABLE && + !its_reliable_endpoint && its_unreliable_endpoint) { + if (its_unreliable_endpoint->is_connected()) { + const std::uint16_t its_port = its_unreliable_endpoint->get_local_port(); + if (its_port) { + std::shared_ptr<eventgroupentry_impl> its_stop_entry = insert_entry(0); + std::shared_ptr<eventgroupentry_impl> its_entry = insert_entry(_subscription->get_ttl()); + insert_option(_message, its_stop_entry, unicast_, its_port, false); + insert_option(_message, its_entry, unicast_, its_port, false); + ret = true; + } else { + VSOMEIP_WARNING << __func__ << ": Didn't insert subscription as " + "local unreliable port is zero: [" + << std::hex << std::setw(4) << std::setfill('0') << _service << "." + << std::hex << std::setw(4) << std::setfill('0') << _instance << "." + << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]"; + } + } else { + _subscription->set_udp_connection_established(false); + } + } else if (_offer_type == remote_offer_type_e::RELIABLE && + its_reliable_endpoint && !its_unreliable_endpoint) { + if (its_reliable_endpoint->is_connected()) { + const std::uint16_t its_port = its_reliable_endpoint->get_local_port(); + if (its_port) { + std::shared_ptr<eventgroupentry_impl> its_stop_entry = insert_entry(0); + std::shared_ptr<eventgroupentry_impl> its_entry = insert_entry(_subscription->get_ttl()); + insert_option(_message, its_stop_entry, unicast_, its_port, true); + insert_option(_message, its_entry, unicast_, its_port, true); + ret = true; + } else { + VSOMEIP_WARNING << __func__ << ": Didn't insert subscription as " + "local reliable port is zero: [" + << std::hex << std::setw(4) << std::setfill('0') << _service << "." + << std::hex << std::setw(4) << std::setfill('0') << _instance << "." + << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]"; + } + } else { + _subscription->set_tcp_connection_established(false); + } + } else if (_offer_type == remote_offer_type_e::RELIABLE_UNRELIABLE && + its_reliable_endpoint && its_unreliable_endpoint) { + if (its_reliable_endpoint->is_connected() && + its_unreliable_endpoint->is_connected()) { + const std::uint16_t its_reliable_port = its_reliable_endpoint->get_local_port(); + const std::uint16_t its_unreliable_port = its_unreliable_endpoint->get_local_port(); + if (its_reliable_port && its_unreliable_port) { + std::shared_ptr<eventgroupentry_impl> its_stop_entry = insert_entry(0); + std::shared_ptr<eventgroupentry_impl> its_entry = insert_entry(_subscription->get_ttl()); + insert_option(_message, its_stop_entry, unicast_, its_reliable_port, true); + insert_option(_message, its_entry, unicast_, its_reliable_port, true); + insert_option(_message, its_stop_entry, unicast_, its_unreliable_port, false); + insert_option(_message, its_entry, unicast_, its_unreliable_port, false); + ret = true; + } else if (!its_reliable_port) { + VSOMEIP_WARNING << __func__ << ": Didn't insert subscription as " + "local reliable port is zero: [" + << std::hex << std::setw(4) << std::setfill('0') << _service << "." + << std::hex << std::setw(4) << std::setfill('0') << _instance << "." + << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]"; + } else if (!its_unreliable_port) { + VSOMEIP_WARNING << __func__ << ": Didn't insert subscription as " + "local unreliable port is zero: [" + << std::hex << std::setw(4) << std::setfill('0') << _service << "." + << std::hex << std::setw(4) << std::setfill('0') << _instance << "." + << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]"; + } + } else { + if (!its_reliable_endpoint->is_connected()) { + _subscription->set_tcp_connection_established(false); + } + if (!its_unreliable_endpoint->is_connected()) { + _subscription->set_udp_connection_established(false); + } + } + } else { + VSOMEIP_WARNING << __func__ << ": Couldn't insert StopSubscribe/Subscribe [" + << std::hex << std::setw(4) << std::setfill('0') << _service << "." + << std::hex << std::setw(4) << std::setfill('0') << _instance << "." + << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "] " + << _offer_type; + } + return ret; } void service_discovery_impl::insert_subscription_ack( @@ -1151,6 +1173,7 @@ void service_discovery_impl::on_message(const byte_t *_data, length_t _length, if (is_reboot(_sender, _destination, its_message->get_reboot_flag(), its_message->get_session())) { VSOMEIP_INFO << "Reboot detected: IP=" << _sender.to_string(); + remove_remote_offer_type_by_ip(_sender); host_->expire_subscriptions(_sender); host_->expire_services(_sender); } @@ -1341,6 +1364,9 @@ void service_discovery_impl::process_serviceentry( // ID: SIP_SD_830 its_request->set_sent_counter(std::uint8_t(repetitions_max_ + 1)); } + remove_remote_offer_type(its_service, its_instance, + (its_reliable_port != ILLEGAL_PORT ? + its_reliable_address : its_unreliable_address)); unsubscribe_all(its_service, its_instance); if (!is_diagnosis_ && !is_suspended_) { host_->del_routing_info(its_service, its_instance, @@ -1367,6 +1393,31 @@ void service_discovery_impl::process_offerservice_serviceentry( std::lock_guard<std::mutex> its_lock(requested_mutex_); its_request->set_sent_counter(std::uint8_t(repetitions_max_ + 1)); } + remote_offer_type_e offer_type(remote_offer_type_e::UNKNOWN); + if (_reliable_port != ILLEGAL_PORT + && _unreliable_port != ILLEGAL_PORT + && !_reliable_address.is_unspecified() + && !_unreliable_address.is_unspecified()) { + offer_type = remote_offer_type_e::RELIABLE_UNRELIABLE; + } else if (_unreliable_port != ILLEGAL_PORT + && !_unreliable_address.is_unspecified()) { + offer_type = remote_offer_type_e::UNRELIABLE; + } else if (_reliable_port != ILLEGAL_PORT + && !_reliable_address.is_unspecified()) { + offer_type = remote_offer_type_e::RELIABLE; + } else { + VSOMEIP_WARNING << __func__ << ": unknown remote offer type [" + << std::hex << std::setw(4) << std::setfill('0') << _service << "." + << std::hex << std::setw(4) << std::setfill('0') << _instance << "]"; + } + + if (update_remote_offer_type(_service,_instance, offer_type, + _reliable_address, _unreliable_address)) { + VSOMEIP_WARNING << __func__ << ": Remote offer type changed [" + << std::hex << std::setw(4) << std::setfill('0') << _service << "." + << std::hex << std::setw(4) << std::setfill('0') << _instance << "]"; + } + host_->add_routing_info(_service, _instance, _major, _minor, @@ -1392,6 +1443,8 @@ void service_discovery_impl::process_offerservice_serviceentry( auto found_instance = found_service->second.find(_instance); if (found_instance != found_service->second.end()) { if (0 < found_instance->second.size()) { + const remote_offer_type_e its_offer_type = + get_remote_offer_type(_service, _instance); for (auto its_eventgroup : found_instance->second) { for (auto its_client : its_eventgroup.second) { std::shared_ptr<subscription> its_subscription(its_client.second); @@ -1400,7 +1453,6 @@ void service_discovery_impl::process_offerservice_serviceentry( bool has_address(false); boost::asio::ip::address its_address; get_subscription_endpoints( - its_client.second->get_subscription_type(), its_unreliable, its_reliable, &its_address, &has_address, _service, _instance, its_client.first); @@ -1420,66 +1472,113 @@ void service_discovery_impl::process_offerservice_serviceentry( } if (its_subscription->is_acknowledged()) { - if (its_subscription->get_endpoint(true) - && its_subscription->get_endpoint(true)->is_connected()) { - // 40 = 16 (subscription) + 2x12 (option) - check_space(40); - const std::size_t options_size_before = - _resubscribes->back().second->get_options().size(); - insert_subscription(_resubscribes->back().second, - _service, _instance, - its_eventgroup.first, - its_subscription, true, true); - const std::size_t options_size_after = - _resubscribes->back().second->get_options().size(); - const std::size_t diff = options_size_after - options_size_before; - _resubscribes->back().first = - static_cast<std::uint16_t>( - _resubscribes->back().first + (12u * diff - 24u)); - } else { - // don't insert reliable endpoint option if the - // TCP client endpoint is not yet connected + if (its_offer_type == remote_offer_type_e::UNRELIABLE && + its_unreliable && its_unreliable->is_connected()) { // 28 = 16 (subscription) + 12 (option) check_space(28); const std::size_t options_size_before = _resubscribes->back().second->get_options().size(); - insert_subscription(_resubscribes->back().second, + if (insert_subscription(_resubscribes->back().second, _service, _instance, its_eventgroup.first, - its_subscription, false, true); - its_client.second->set_tcp_connection_established(false); - const std::size_t options_size_after = - _resubscribes->back().second->get_options().size(); - const std::size_t diff = options_size_after - options_size_before; - _resubscribes->back().first = - static_cast<std::uint16_t>( - _resubscribes->back().first + (12u * diff - 12u)); - // restart TCP endpoint if not connected - if (its_subscription->get_endpoint(true) - && !its_subscription->get_endpoint(true)->is_connected()) { - its_subscription->get_endpoint(true)->restart(); + its_subscription, its_offer_type)) { + its_subscription->set_acknowledged(false); + const std::size_t options_size_after = + _resubscribes->back().second->get_options().size(); + const std::size_t diff = options_size_after - options_size_before; + _resubscribes->back().first = + static_cast<std::uint16_t>( + _resubscribes->back().first + (12u * diff - 12u)); + } else { + _resubscribes->back().first = + static_cast<std::uint16_t>( + _resubscribes->back().first - 28); + } + } else if (its_offer_type == remote_offer_type_e::RELIABLE && + its_reliable) { + if (its_reliable->is_connected()) { + // 28 = 16 (subscription) + 12 (option) + check_space(28); + const std::size_t options_size_before = + _resubscribes->back().second->get_options().size(); + if (insert_subscription(_resubscribes->back().second, + _service, _instance, + its_eventgroup.first, + its_subscription, its_offer_type)) { + its_subscription->set_acknowledged(false); + const std::size_t options_size_after = + _resubscribes->back().second->get_options().size(); + const std::size_t diff = options_size_after - options_size_before; + _resubscribes->back().first = + static_cast<std::uint16_t>( + _resubscribes->back().first + (12u * diff - 12u)); + } else { + _resubscribes->back().first = + static_cast<std::uint16_t>( + _resubscribes->back().first - 28); + } + } else { + its_client.second->set_tcp_connection_established(false); + // restart TCP endpoint if not connected + its_reliable->restart(); + } + } else if (its_offer_type == remote_offer_type_e::RELIABLE_UNRELIABLE) { + if (its_reliable && its_unreliable && + its_reliable->is_connected() && + its_unreliable->is_connected()) { + // 40 = 16 (subscription) + 2x12 (option) + check_space(40); + const std::size_t options_size_before = + _resubscribes->back().second->get_options().size(); + if (insert_subscription(_resubscribes->back().second, + _service, _instance, + its_eventgroup.first, + its_subscription, its_offer_type)) { + its_subscription->set_acknowledged(false); + const std::size_t options_size_after = + _resubscribes->back().second->get_options().size(); + const std::size_t diff = options_size_after - options_size_before; + _resubscribes->back().first = + static_cast<std::uint16_t>( + _resubscribes->back().first + (12u * diff - 24u)); + } else { + _resubscribes->back().first = + static_cast<std::uint16_t>( + _resubscribes->back().first - 40); + } + } else if (its_reliable && !its_reliable->is_connected()) { + its_client.second->set_tcp_connection_established(false); + // restart TCP endpoint if not connected + its_reliable->restart(); } + } else { + VSOMEIP_WARNING << __func__ << ": unknown remote offer type [" + << std::hex << std::setw(4) << std::setfill('0') << _service << "." + << std::hex << std::setw(4) << std::setfill('0') << _instance << "]"; } - its_subscription->set_acknowledged(false); } else { // 56 = 2x16 (subscription) + 2x12 (option) check_space(56); const std::size_t options_size_before = _resubscribes->back().second->get_options().size(); - insert_nack_subscription_on_resubscribe(_resubscribes->back().second, + if (insert_nack_subscription_on_resubscribe(_resubscribes->back().second, _service, _instance, its_eventgroup.first, - its_subscription); - const std::size_t options_size_after = - _resubscribes->back().second->get_options().size(); - const std::size_t diff = options_size_after - options_size_before; - _resubscribes->back().first = - static_cast<std::uint16_t>( - _resubscribes->back().first + (12u * diff - 24u)); + its_subscription, its_offer_type) ) { + const std::size_t options_size_after = + _resubscribes->back().second->get_options().size(); + const std::size_t diff = options_size_after - options_size_before; + _resubscribes->back().first = + static_cast<std::uint16_t>( + _resubscribes->back().first + (12u * diff - 24u)); + } else { + _resubscribes->back().first = + static_cast<std::uint16_t>( + _resubscribes->back().first - 56u); + } // restart TCP endpoint if not connected - if (its_subscription->get_endpoint(true) - && !its_subscription->get_endpoint(true)->is_connected()) { - its_subscription->get_endpoint(true)->restart(); + if (its_reliable && !its_reliable->is_connected()) { + its_reliable->restart(); } } } @@ -1592,7 +1691,8 @@ void service_discovery_impl::on_endpoint_connected( auto found_instance = found_service->second.find(_instance); if (found_instance != found_service->second.end()) { if(0 < found_instance->second.size()) { - + const remote_offer_type_e its_offer_type = + get_remote_offer_type(_service, _instance); for(const auto &its_eventgroup : found_instance->second) { for(const auto &its_client : its_eventgroup.second) { if (its_client.first != VSOMEIP_ROUTING_CLIENT) { @@ -1637,7 +1737,6 @@ void service_discovery_impl::on_endpoint_connected( std::shared_ptr<endpoint> its_unreliable; std::shared_ptr<endpoint> its_reliable; get_subscription_endpoints( - its_subscription->get_subscription_type(), its_unreliable, its_reliable, &its_address, &has_address, _service, _instance, its_client.first); @@ -1645,7 +1744,7 @@ void service_discovery_impl::on_endpoint_connected( its_subscription->set_endpoint(its_unreliable, false); insert_subscription(its_message, _service, _instance, its_eventgroup.first, - its_subscription, true, true); + its_subscription, its_offer_type); its_subscription->set_acknowledged(false); } } @@ -2482,6 +2581,8 @@ void service_discovery_impl::send_subscriptions(service_t _service, instance_t _ if (found_service != subscribed_.end()) { auto found_instance = found_service->second.find(_instance); if (found_instance != found_service->second.end()) { + const remote_offer_type_e its_offer_type = + get_remote_offer_type(_service, _instance); for (auto found_eventgroup : found_instance->second) { auto found_client = found_eventgroup.second.find(_client); if (found_client != found_eventgroup.second.end()) { @@ -2490,7 +2591,6 @@ void service_discovery_impl::send_subscriptions(service_t _service, instance_t _ bool has_address(false); boost::asio::ip::address its_address; get_subscription_endpoints( - found_client->second->get_subscription_type(), its_unreliable, its_reliable, &its_address, &has_address, _service, _instance, found_client->first); @@ -2526,7 +2626,7 @@ void service_discovery_impl::send_subscriptions(service_t _service, instance_t _ if (its_reliable->is_connected() && its_unreliable->is_connected()) { insert_subscription(its_message, _service, _instance, found_eventgroup.first, - found_client->second, true, true); + found_client->second, its_offer_type); found_client->second->set_tcp_connection_established(true); found_client->second->set_udp_connection_established(true); } else { @@ -2538,7 +2638,7 @@ void service_discovery_impl::send_subscriptions(service_t _service, instance_t _ if(endpoint->is_connected()) { insert_subscription(its_message, _service, _instance, found_eventgroup.first, - found_client->second, _reliable, !_reliable); + found_client->second, its_offer_type); found_client->second->set_tcp_connection_established(true); } else { // don't insert reliable endpoint option if the @@ -2549,7 +2649,7 @@ void service_discovery_impl::send_subscriptions(service_t _service, instance_t _ if (endpoint->is_connected()) { insert_subscription(its_message, _service, _instance, found_eventgroup.first, - found_client->second, _reliable, !_reliable); + found_client->second, its_offer_type); found_client->second->set_udp_connection_established(true); } else { // don't insert unreliable endpoint option if the @@ -3517,5 +3617,78 @@ void service_discovery_impl::stop_last_msg_received_timer() { last_msg_received_timer_.cancel(ec); } +service_discovery_impl::remote_offer_type_e service_discovery_impl::get_remote_offer_type( + service_t _service, instance_t _instance) { + std::lock_guard<std::mutex> its_lock(remote_offer_types_mutex_); + auto found_si = remote_offer_types_.find(std::make_pair(_service, _instance)); + if (found_si != remote_offer_types_.end()) { + return found_si->second; + } + return remote_offer_type_e::UNKNOWN; +} + +bool service_discovery_impl::update_remote_offer_type( + service_t _service, instance_t _instance, + remote_offer_type_e _offer_type, + const boost::asio::ip::address &_reliable_address, + const boost::asio::ip::address &_unreliable_address) { + bool ret(false); + std::lock_guard<std::mutex> its_lock(remote_offer_types_mutex_); + const std::pair<service_t, instance_t> its_si_pair = std::make_pair(_service, _instance); + auto found_si = remote_offer_types_.find(its_si_pair); + if (found_si != remote_offer_types_.end()) { + if (found_si->second != _offer_type ) { + found_si->second = _offer_type; + ret = true; + } + } else { + remote_offer_types_[its_si_pair] = _offer_type; + } + switch (_offer_type) { + case remote_offer_type_e::UNRELIABLE: + remote_offers_by_ip_[_unreliable_address].insert(its_si_pair); + break; + case remote_offer_type_e::RELIABLE: + remote_offers_by_ip_[_reliable_address].insert(its_si_pair); + break; + case remote_offer_type_e::RELIABLE_UNRELIABLE: + remote_offers_by_ip_[_unreliable_address].insert(its_si_pair); + break; + case remote_offer_type_e::UNKNOWN: + default: + VSOMEIP_WARNING << __func__ << ": unkown offer type [" + << std::hex << std::setw(4) << std::setfill('0') << _service << "." + << std::hex << std::setw(4) << std::setfill('0') << _instance << "]" + << _offer_type; + break; + } + return ret; +} + +void service_discovery_impl::remove_remote_offer_type( + service_t _service, instance_t _instance, + const boost::asio::ip::address &_address) { + std::lock_guard<std::mutex> its_lock(remote_offer_types_mutex_); + const std::pair<service_t, instance_t> its_si_pair = + std::make_pair(_service, _instance); + remote_offer_types_.erase(its_si_pair); + auto found_services = remote_offers_by_ip_.find(_address); + if (found_services != remote_offers_by_ip_.end()) { + found_services->second.erase(its_si_pair); + } +} + +void service_discovery_impl::remove_remote_offer_type_by_ip( + const boost::asio::ip::address &_address) { + std::lock_guard<std::mutex> its_lock(remote_offer_types_mutex_); + auto found_services = remote_offers_by_ip_.find(_address); + if (found_services != remote_offers_by_ip_.end()) { + for (const auto& si : found_services->second) { + remote_offer_types_.erase(si); + } + } + remote_offers_by_ip_.erase(_address); +} + } // namespace sd } // namespace vsomeip |