summaryrefslogtreecommitdiff
path: root/implementation
diff options
context:
space:
mode:
authorJuergen Gehring <juergen.gehring@bmw.de>2018-01-25 00:40:08 -0800
committerJuergen Gehring <juergen.gehring@bmw.de>2018-01-25 00:40:08 -0800
commit565b97b0108a02ef41284629a6d226c053c0dd7e (patch)
tree5a0b28ba67e3ac8d4cf349ea3a464d8cfd4a5ac1 /implementation
parent8936891b5db1a0c894a3ec0af52c081b52cca46c (diff)
downloadvSomeIP-565b97b0108a02ef41284629a6d226c053c0dd7e.tar.gz
vsomeip 2.10.82.10.8
Diffstat (limited to 'implementation')
-rw-r--r--implementation/configuration/include/policy.hpp6
-rw-r--r--implementation/configuration/src/configuration_impl.cpp25
-rw-r--r--implementation/endpoints/include/client_endpoint_impl.hpp7
-rw-r--r--implementation/endpoints/src/client_endpoint_impl.cpp20
-rw-r--r--implementation/endpoints/src/local_client_endpoint_impl.cpp6
-rw-r--r--implementation/endpoints/src/tcp_client_endpoint_impl.cpp10
-rw-r--r--implementation/endpoints/src/udp_client_endpoint_impl.cpp7
-rw-r--r--implementation/routing/src/routing_manager_impl.cpp32
-rw-r--r--implementation/routing/src/routing_manager_proxy.cpp15
-rw-r--r--implementation/runtime/include/application_impl.hpp9
-rw-r--r--implementation/runtime/src/application_impl.cpp127
-rw-r--r--implementation/service_discovery/include/service_discovery_impl.hpp33
-rw-r--r--implementation/service_discovery/src/service_discovery_impl.cpp681
13 files changed, 638 insertions, 340 deletions
diff --git a/implementation/configuration/include/policy.hpp b/implementation/configuration/include/policy.hpp
index e94d7b5..89e8a96 100644
--- a/implementation/configuration/include/policy.hpp
+++ b/implementation/configuration/include/policy.hpp
@@ -14,12 +14,18 @@ namespace vsomeip {
namespace cfg {
struct policy {
+ policy() :
+ uid_(0), is_uid_set_(false), gid_(0), is_gid_set_(false) {
+ }
+
std::set<std::pair<service_t, instance_t>> allowed_services_;
std::set<std::pair<service_t, instance_t>> allowed_offers_;
std::set<std::pair<service_t, instance_t>> denied_services_;
std::set<std::pair<service_t, instance_t>> denied_offers_;
std::uint32_t uid_;
+ bool is_uid_set_;
std::uint32_t gid_;
+ bool is_gid_set_;
bool allow_;
};
diff --git a/implementation/configuration/src/configuration_impl.cpp b/implementation/configuration/src/configuration_impl.cpp
index 0920b66..178481c 100644
--- a/implementation/configuration/src/configuration_impl.cpp
+++ b/implementation/configuration/src/configuration_impl.cpp
@@ -1667,7 +1667,7 @@ void configuration_impl::load_policy(const boost::property_tree::ptree &_tree) {
}
}
if (overrides) {
- VSOMEIP_WARNING << std::hex << "Security configuration: "
+ VSOMEIP_INFO << std::hex << "Security configuration: "
<< "Client range 0x" << firstClient
<< " - 0x" << lastClient << " overrides policy of "
<< std::dec << overrides << " clients";
@@ -1684,31 +1684,33 @@ void configuration_impl::load_policy(const boost::property_tree::ptree &_tree) {
its_converter >> client;
if (client != 0x0) {
if (policies_.find(client) != policies_.end()) {
- VSOMEIP_WARNING << std::hex << "Security configuration: "
+ VSOMEIP_INFO << std::hex << "Security configuration: "
<< "Overriding policy for client 0x" << client << ".";
}
policies_[client] = policy;
}
}
} else if (i->first == "credentials") {
- uint32_t uid = 0x0;
- uint32_t gid = 0x0;
for (auto n = i->second.begin();
n != i->second.end(); ++n) {
if (n->first == "uid") {
std::stringstream its_converter;
std::string value = n->second.data();
- its_converter << std::dec << value;
- its_converter >> uid;
+ if (value != "any") {
+ its_converter << std::dec << value;
+ its_converter >> policy->uid_ ;
+ policy->is_uid_set_ = true;
+ }
} else if (n->first == "gid") {
std::stringstream its_converter;
std::string value = n->second.data();
- its_converter << std::dec << value;
- its_converter >> gid;
+ if (value != "any") {
+ its_converter << std::dec << value;
+ its_converter >> policy->gid_ ;
+ policy->is_gid_set_ = true;
+ }
}
}
- policy->uid_ = uid;
- policy->gid_ = gid;
} else if (i->first == "allow") {
if (allow_deny_set) {
VSOMEIP_WARNING << "Security configuration: \"allow\" tag overrides "
@@ -2410,7 +2412,8 @@ bool configuration_impl::check_credentials(client_t _client, uint32_t _uid,
}
auto its_client = policies_.find(_client);
if (its_client != policies_.end()) {
- if (its_client->second->uid_ == _uid && its_client->second->gid_ == _gid) {
+ if ((!its_client->second->is_uid_set_ || its_client->second->uid_ == _uid)
+ && (!its_client->second->is_gid_set_ || its_client->second->gid_ == _gid)) {
return true;
}
}
diff --git a/implementation/endpoints/include/client_endpoint_impl.hpp b/implementation/endpoints/include/client_endpoint_impl.hpp
index d8c758d..b72ea7e 100644
--- a/implementation/endpoints/include/client_endpoint_impl.hpp
+++ b/implementation/endpoints/include/client_endpoint_impl.hpp
@@ -70,6 +70,11 @@ public:
virtual void print_status() = 0;
protected:
+ enum class cei_state_e : std::uint8_t {
+ CLOSED,
+ CONNECTING,
+ ESTABLISHED
+ };
virtual void send_queued() = 0;
void shutdown_and_close_socket(bool _recreate_socket);
void shutdown_and_close_socket_unlocked(bool _recreate_socket);
@@ -84,7 +89,7 @@ protected:
std::mutex connect_timer_mutex_;
boost::asio::steady_timer connect_timer_;
std::atomic<uint32_t> connect_timeout_;
- std::atomic<bool> is_connected_;
+ std::atomic<cei_state_e> state_;
// send data
message_buffer_ptr_t packetizer_;
diff --git a/implementation/endpoints/src/client_endpoint_impl.cpp b/implementation/endpoints/src/client_endpoint_impl.cpp
index 55316a9..be8e684 100644
--- a/implementation/endpoints/src/client_endpoint_impl.cpp
+++ b/implementation/endpoints/src/client_endpoint_impl.cpp
@@ -37,7 +37,7 @@ client_endpoint_impl<Protocol>::client_endpoint_impl(
socket_(new socket_type(_io)), remote_(_remote),
flush_timer_(_io), connect_timer_(_io),
connect_timeout_(VSOMEIP_DEFAULT_CONNECT_TIMEOUT), // TODO: use config variable
- is_connected_(false),
+ state_(cei_state_e::CLOSED),
packetizer_(std::make_shared<message_buffer_t>()),
queue_size_(0),
was_not_connected_(false),
@@ -55,11 +55,16 @@ bool client_endpoint_impl<Protocol>::is_client() const {
template<typename Protocol>
bool client_endpoint_impl<Protocol>::is_connected() const {
- return is_connected_;
+ return state_ == cei_state_e::ESTABLISHED;
}
template<typename Protocol>
-void client_endpoint_impl<Protocol>::set_connected(bool _connected) { is_connected_ = _connected; }
+void client_endpoint_impl<Protocol>::set_connected(bool _connected) {
+ if (_connected) {
+ state_ = cei_state_e::ESTABLISHED;
+ } else {
+ state_ = cei_state_e::CLOSED;
+ } }
template<typename Protocol> void client_endpoint_impl<Protocol>::stop() {
{
std::lock_guard<std::mutex> its_lock(mutex_);
@@ -222,8 +227,8 @@ void client_endpoint_impl<Protocol>::connect_cbk(
if (connect_timeout_ < VSOMEIP_MAX_CONNECT_TIMEOUT)
connect_timeout_ = (connect_timeout_ << 1);
- if (is_connected_) {
- is_connected_ = false;
+ if (state_ != cei_state_e::ESTABLISHED) {
+ state_ = cei_state_e::CLOSED;
its_host->on_disconnect(this->shared_from_this());
}
} else {
@@ -233,7 +238,7 @@ void client_endpoint_impl<Protocol>::connect_cbk(
}
connect_timeout_ = VSOMEIP_DEFAULT_CONNECT_TIMEOUT; // TODO: use config variable
set_local_port();
- if (!is_connected_) {
+ if (state_ != cei_state_e::ESTABLISHED) {
its_host->on_connect(this->shared_from_this());
}
@@ -270,7 +275,7 @@ void client_endpoint_impl<Protocol>::send_cbk(
send_queued();
}
} else if (_error == boost::asio::error::broken_pipe) {
- is_connected_ = false;
+ state_ = cei_state_e::CLOSED;
bool stopping(false);
{
std::lock_guard<std::mutex> its_lock(mutex_);
@@ -319,6 +324,7 @@ void client_endpoint_impl<Protocol>::send_cbk(
connect();
} else if (_error == boost::asio::error::not_connected
|| _error == boost::asio::error::bad_descriptor) {
+ state_ = cei_state_e::CLOSED;
was_not_connected_ = true;
shutdown_and_close_socket(true);
connect();
diff --git a/implementation/endpoints/src/local_client_endpoint_impl.cpp b/implementation/endpoints/src/local_client_endpoint_impl.cpp
index 009ac9f..b84cb0a 100644
--- a/implementation/endpoints/src/local_client_endpoint_impl.cpp
+++ b/implementation/endpoints/src/local_client_endpoint_impl.cpp
@@ -46,7 +46,10 @@ bool local_client_endpoint_impl::is_local() const {
}
void local_client_endpoint_impl::restart() {
- is_connected_ = false;
+ if (state_ == cei_state_e::CONNECTING) {
+ return;
+ }
+ state_ = cei_state_e::CONNECTING;
{
std::lock_guard<std::mutex> its_lock(mutex_);
sending_blocked_ = false;
@@ -113,6 +116,7 @@ void local_client_endpoint_impl::connect() {
VSOMEIP_WARNING << "local_client_endpoint_impl::connect: "
<< "couldn't enable SO_REUSEADDR: " << its_error.message();
}
+ state_ = cei_state_e::CONNECTING;
socket_->connect(remote_, its_connect_error);
// Credentials
diff --git a/implementation/endpoints/src/tcp_client_endpoint_impl.cpp b/implementation/endpoints/src/tcp_client_endpoint_impl.cpp
index 143b95a..b1a2f74 100644
--- a/implementation/endpoints/src/tcp_client_endpoint_impl.cpp
+++ b/implementation/endpoints/src/tcp_client_endpoint_impl.cpp
@@ -61,7 +61,10 @@ void tcp_client_endpoint_impl::start() {
}
void tcp_client_endpoint_impl::restart() {
- is_connected_ = false;
+ if (state_ == cei_state_e::CONNECTING) {
+ return;
+ }
+ state_ = cei_state_e::CONNECTING;
std::string address_port_local;
{
std::lock_guard<std::mutex> its_lock(socket_mutex_);
@@ -137,7 +140,7 @@ void tcp_client_endpoint_impl::connect() {
"Error binding socket: " << its_bind_error.message();
}
}
-
+ state_ = cei_state_e::CONNECTING;
socket_->async_connect(
remote_,
std::bind(
@@ -180,7 +183,6 @@ void tcp_client_endpoint_impl::receive(message_buffer_ptr_t _recv_buffer,
_recv_buffer->resize(its_required_capacity, 0x0);
}
buffer_size = _missing_capacity;
- _missing_capacity = 0;
} else if (buffer_shrink_threshold_
&& shrink_count_ > buffer_shrink_threshold_
&& _recv_buffer_size == 0) {
@@ -517,7 +519,7 @@ void tcp_client_endpoint_impl::receive_cbk(
<< _error.message() << "( " << std::dec << _error.value()
<< ") local: " << get_address_port_local()
<< " remote: " << get_address_port_remote();
- is_connected_ = false;
+ state_ = cei_state_e::CLOSED;
shutdown_and_close_socket_unlocked(false);
} else {
VSOMEIP_WARNING << "tcp_client_endpoint receive_cbk: "
diff --git a/implementation/endpoints/src/udp_client_endpoint_impl.cpp b/implementation/endpoints/src/udp_client_endpoint_impl.cpp
index 6bc0bf5..6252b40 100644
--- a/implementation/endpoints/src/udp_client_endpoint_impl.cpp
+++ b/implementation/endpoints/src/udp_client_endpoint_impl.cpp
@@ -51,7 +51,7 @@ void udp_client_endpoint_impl::connect() {
"Error binding socket: " << its_bind_error.message();
}
}
-
+ state_ = cei_state_e::CONNECTING;
socket_->async_connect(
remote_,
std::bind(
@@ -77,7 +77,10 @@ void udp_client_endpoint_impl::start() {
}
void udp_client_endpoint_impl::restart() {
- is_connected_ = false;
+ if (state_ == cei_state_e::CONNECTING) {
+ return;
+ }
+ state_ = cei_state_e::CONNECTING;
{
std::lock_guard<std::mutex> its_lock(mutex_);
queue_.clear();
diff --git a/implementation/routing/src/routing_manager_impl.cpp b/implementation/routing/src/routing_manager_impl.cpp
index 0c443fd..2acd4eb 100644
--- a/implementation/routing/src/routing_manager_impl.cpp
+++ b/implementation/routing/src/routing_manager_impl.cpp
@@ -3204,32 +3204,14 @@ void routing_manager_impl::on_identify_response(client_t _client, service_t _ser
void routing_manager_impl::identify_for_subscribe(client_t _client,
service_t _service, instance_t _instance, major_version_t _major,
subscription_type_e _subscription_type) {
- if (_subscription_type == subscription_type_e::SU_RELIABLE_AND_UNRELIABLE
- || _subscription_type == subscription_type_e::SU_PREFER_UNRELIABLE
- || _subscription_type == subscription_type_e::SU_UNRELIABLE) {
- if (!has_identified(_client, _service, _instance, false)
- && !is_identifying(_client, _service, _instance, false)) {
- if (!send_identify_message(_client, _service, _instance, _major,
- false) && _subscription_type
- == subscription_type_e::SU_PREFER_UNRELIABLE) {
- send_identify_message(_client, _service, _instance, _major,
- true);
- }
- }
+ (void)_subscription_type;
+ if (!has_identified(_client, _service, _instance, false)
+ && !is_identifying(_client, _service, _instance, false)) {
+ send_identify_message(_client, _service, _instance, _major, false);
}
-
- if (_subscription_type == subscription_type_e::SU_RELIABLE_AND_UNRELIABLE
- || _subscription_type == subscription_type_e::SU_PREFER_RELIABLE
- || _subscription_type == subscription_type_e::SU_RELIABLE) {
- if (!has_identified(_client, _service, _instance, true)
- && !is_identifying(_client, _service, _instance, true)) {
- if (!send_identify_message(_client, _service, _instance, _major,
- true) && _subscription_type
- == subscription_type_e::SU_PREFER_RELIABLE) {
- send_identify_message(_client, _service, _instance, _major,
- false);
- }
- }
+ if (!has_identified(_client, _service, _instance, true)
+ && !is_identifying(_client, _service, _instance, true)) {
+ send_identify_message(_client, _service, _instance, _major, true);
}
}
diff --git a/implementation/routing/src/routing_manager_proxy.cpp b/implementation/routing/src/routing_manager_proxy.cpp
index a15ae2d..f6aa420 100644
--- a/implementation/routing/src/routing_manager_proxy.cpp
+++ b/implementation/routing/src/routing_manager_proxy.cpp
@@ -363,12 +363,15 @@ void routing_manager_proxy::register_event(client_t _client,
_event, _eventgroups, _is_field, _is_provided);
}
}
- VSOMEIP_INFO << "REGISTER EVENT("
- << std::hex << std::setw(4) << std::setfill('0') << _client << "): ["
- << std::hex << std::setw(4) << std::setfill('0') << _service << "."
- << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
- << std::hex << std::setw(4) << std::setfill('0') << _event
- << ":is_provider=" << _is_provided << "]";
+
+ if(_is_provided) {
+ VSOMEIP_INFO << "REGISTER EVENT("
+ << std::hex << std::setw(4) << std::setfill('0') << _client << "): ["
+ << std::hex << std::setw(4) << std::setfill('0') << _service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _event
+ << ":is_provider=" << _is_provided << "]";
+ }
}
void routing_manager_proxy::unregister_event(client_t _client,
diff --git a/implementation/runtime/include/application_impl.hpp b/implementation/runtime/include/application_impl.hpp
index 2d6cd82..efd7d7d 100644
--- a/implementation/runtime/include/application_impl.hpp
+++ b/implementation/runtime/include/application_impl.hpp
@@ -203,7 +203,6 @@ private:
sync_handler(std::function<void()> _handler) :
handler_(_handler),
- is_dispatching_(false),
service_id_(ANY_SERVICE),
instance_id_(ANY_INSTANCE),
method_id_(ANY_METHOD),
@@ -215,7 +214,6 @@ private:
method_t _method_id, session_t _session_id,
eventgroup_t _eventgroup_id, handler_type_e _handler_type) :
handler_(nullptr),
- is_dispatching_(false),
service_id_(_service_id),
instance_id_(_instance_id),
method_id_(_method_id),
@@ -224,7 +222,6 @@ private:
handler_type_(_handler_type) { }
std::function<void()> handler_;
- bool is_dispatching_;
service_t service_id_;
instance_t instance_id_;
method_t method_id_;
@@ -268,6 +265,8 @@ private:
void main_dispatch();
void dispatch();
void invoke_handler(std::shared_ptr<sync_handler> &_handler);
+ std::shared_ptr<sync_handler> get_next_handler();
+ void reschedule_availability_handler(const std::shared_ptr<sync_handler> &_handler);
bool has_active_dispatcher();
bool is_active_dispatcher(const std::thread::id &_id);
void remove_elapsed_dispatchers();
@@ -372,6 +371,7 @@ private:
std::set<std::thread::id> running_dispatchers_;
// Mutex to protect access to dispatchers_ & elapsed_dispatchers_
std::mutex dispatcher_mutex_;
+
// Condition to wakeup the dispatcher thread
mutable std::condition_variable dispatcher_condition_;
std::size_t max_dispatchers_;
@@ -419,6 +419,9 @@ private:
bool client_side_logging_;
std::set<std::tuple<service_t, instance_t> > client_side_logging_filter_;
+
+ std::map<std::pair<service_t, instance_t>,
+ std::deque<std::shared_ptr<sync_handler> > > availability_handlers_;
};
} // namespace vsomeip
diff --git a/implementation/runtime/src/application_impl.cpp b/implementation/runtime/src/application_impl.cpp
index d97be8e..720ddb1 100644
--- a/implementation/runtime/src/application_impl.cpp
+++ b/implementation/runtime/src/application_impl.cpp
@@ -1557,13 +1557,18 @@ void application_impl::main_dispatch() {
dispatcher_condition_.wait(its_lock);
}
} else {
- while (is_dispatching_ && !handlers_.empty() && is_active_dispatcher(its_id)) {
- std::shared_ptr<sync_handler> its_handler = handlers_.front();
- handlers_.pop_front();
+ std::shared_ptr<sync_handler> its_handler;
+ while (is_dispatching_ && is_active_dispatcher(its_id)
+ && (its_handler = get_next_handler())) {
its_lock.unlock();
invoke_handler(its_handler);
+
+ if (!is_dispatching_)
+ return;
+
its_lock.lock();
+ reschedule_availability_handler(its_handler);
remove_elapsed_dispatchers();
#ifdef _WIN32
@@ -1596,14 +1601,18 @@ void application_impl::dispatch() {
return;
}
} else {
- while (is_dispatching_ && !handlers_.empty()
- && is_active_dispatcher(its_id)) {
- std::shared_ptr<sync_handler> its_handler = handlers_.front();
- handlers_.pop_front();
+ std::shared_ptr<sync_handler> its_handler;
+ while (is_dispatching_ && is_active_dispatcher(its_id)
+ && (its_handler = get_next_handler())) {
its_lock.unlock();
invoke_handler(its_handler);
+
+ if (!is_dispatching_)
+ return;
+
its_lock.lock();
+ reschedule_availability_handler(its_handler);
remove_elapsed_dispatchers();
}
}
@@ -1616,6 +1625,72 @@ void application_impl::dispatch() {
dispatcher_condition_.notify_all();
}
+std::shared_ptr<application_impl::sync_handler> application_impl::get_next_handler() {
+ std::shared_ptr<sync_handler> its_next_handler;
+ while (!handlers_.empty() && !its_next_handler) {
+ its_next_handler = handlers_.front();
+ handlers_.pop_front();
+
+ // Check handler
+ if (its_next_handler->handler_type_ == handler_type_e::AVAILABILITY) {
+ const std::pair<service_t, instance_t> its_si_pair = std::make_pair(
+ its_next_handler->service_id_,
+ its_next_handler->instance_id_);
+ auto found_si = availability_handlers_.find(its_si_pair);
+ if (found_si != availability_handlers_.end()
+ && !found_si->second.empty()
+ && found_si->second.front() != its_next_handler) {
+ found_si->second.push_back(its_next_handler);
+ // There is a running availability handler for this service.
+ // Therefore, this one must wait...
+ its_next_handler = nullptr;
+ } else {
+ availability_handlers_[its_si_pair].push_back(its_next_handler);
+ }
+ } else if (its_next_handler->handler_type_ == handler_type_e::MESSAGE) {
+ const std::pair<service_t, instance_t> its_si_pair = std::make_pair(
+ its_next_handler->service_id_,
+ its_next_handler->instance_id_);
+ auto found_si = availability_handlers_.find(its_si_pair);
+ if (found_si != availability_handlers_.end()
+ && found_si->second.size() > 1) {
+ // The message comes after the next availability handler
+ // Therefore, queue it to the last one
+ found_si->second.push_back(its_next_handler);
+ its_next_handler = nullptr;
+ }
+ }
+ }
+
+ return its_next_handler;
+}
+
+void application_impl::reschedule_availability_handler(
+ const std::shared_ptr<sync_handler> &_handler) {
+ if (_handler->handler_type_ == handler_type_e::AVAILABILITY) {
+ const std::pair<service_t, instance_t> its_si_pair = std::make_pair(
+ _handler->service_id_, _handler->instance_id_);
+ auto found_si = availability_handlers_.find(its_si_pair);
+ if (found_si != availability_handlers_.end()) {
+ if (!found_si->second.empty()
+ && found_si->second.front() == _handler) {
+ found_si->second.pop_front();
+
+ // If there are other availability handlers pending, schedule
+ // them and all handlers that were queued because of them
+ for (auto it = found_si->second.rbegin();
+ it != found_si->second.rend(); it++) {
+ handlers_.push_front(*it);
+ }
+ availability_handlers_.erase(found_si);
+ }
+ return;
+ }
+ VSOMEIP_WARNING << __func__
+ << ": An unknown availability handler returned!";
+ }
+}
+
void application_impl::invoke_handler(std::shared_ptr<sync_handler> &_handler) {
const std::thread::id its_id = std::this_thread::get_id();
@@ -1669,11 +1744,17 @@ void application_impl::invoke_handler(std::shared_ptr<sync_handler> &_handler) {
<< "type=" << static_cast<std::uint32_t>(its_sync_handler->handler_type_)
<< " thread=" << std::hex << its_id;
}
- if (is_dispatching_) {
- {
- std::lock_guard<std::mutex> its_lock(dispatcher_mutex_);
+
+ while (is_dispatching_ ) {
+ if (dispatcher_mutex_.try_lock()) {
running_dispatchers_.insert(its_id);
+ dispatcher_mutex_.unlock();
+ break;
}
+ std::this_thread::yield();
+ }
+
+ if (is_dispatching_) {
try {
_handler->handler_();
} catch (const std::exception &e) {
@@ -1684,9 +1765,14 @@ void application_impl::invoke_handler(std::shared_ptr<sync_handler> &_handler) {
}
boost::system::error_code ec;
its_dispatcher_timer.cancel(ec);
- if (is_dispatching_) {
- std::lock_guard<std::mutex> its_lock(dispatcher_mutex_);
- running_dispatchers_.erase(its_id);
+
+ while (is_dispatching_ ) {
+ if (dispatcher_mutex_.try_lock()) {
+ running_dispatchers_.erase(its_id);
+ dispatcher_mutex_.unlock();
+ return;
+ }
+ std::this_thread::yield();
}
}
@@ -1801,6 +1887,7 @@ void application_impl::shutdown() {
its_dispatcher.second->detach();
}
}
+ availability_handlers_.clear();
running_dispatchers_.clear();
elapsed_dispatchers_.clear();
dispatchers_.clear();
@@ -2033,13 +2120,13 @@ bool application_impl::check_subscription_state(service_t _service, instance_t _
void application_impl::print_blocking_call(std::shared_ptr<sync_handler> _handler) {
switch (_handler->handler_type_) {
case handler_type_e::AVAILABILITY:
- VSOMEIP_INFO << "BLOCKING CALL AVAILABILITY("
+ VSOMEIP_WARNING << "BLOCKING CALL AVAILABILITY("
<< std::hex << std::setw(4) << std::setfill('0') << get_client() <<"): ["
<< std::hex << std::setw(4) << std::setfill('0') << _handler->service_id_ << "."
<< std::hex << std::setw(4) << std::setfill('0') << _handler->instance_id_ << "]";
break;
case handler_type_e::MESSAGE:
- VSOMEIP_INFO << "BLOCKING CALL MESSAGE("
+ VSOMEIP_WARNING << "BLOCKING CALL MESSAGE("
<< std::hex << std::setw(4) << std::setfill('0') << get_client() <<"): ["
<< std::hex << std::setw(4) << std::setfill('0') << _handler->service_id_ << "."
<< std::hex << std::setw(4) << std::setfill('0') << _handler->instance_id_ << "."
@@ -2047,11 +2134,11 @@ void application_impl::print_blocking_call(std::shared_ptr<sync_handler> _handle
<< std::hex << std::setw(4) << std::setfill('0') << _handler->session_id_ << "]";
break;
case handler_type_e::STATE:
- VSOMEIP_INFO << "BLOCKING CALL STATE("
+ VSOMEIP_WARNING << "BLOCKING CALL STATE("
<< std::hex << std::setw(4) << std::setfill('0') << get_client() << ")";
break;
case handler_type_e::SUBSCRIPTION:
- VSOMEIP_INFO << "BLOCKING CALL SUBSCRIPTION("
+ VSOMEIP_WARNING << "BLOCKING CALL SUBSCRIPTION("
<< std::hex << std::setw(4) << std::setfill('0') << get_client() <<"): ["
<< std::hex << std::setw(4) << std::setfill('0') << _handler->service_id_ << "."
<< std::hex << std::setw(4) << std::setfill('0') << _handler->instance_id_ << "."
@@ -2059,15 +2146,15 @@ void application_impl::print_blocking_call(std::shared_ptr<sync_handler> _handle
<< std::hex << std::setw(4) << std::setfill('0') << _handler->method_id_ << "]";
break;
case handler_type_e::OFFERED_SERVICES_INFO:
- VSOMEIP_INFO << "BLOCKING CALL OFFERED_SERVICES_INFO("
+ VSOMEIP_WARNING << "BLOCKING CALL OFFERED_SERVICES_INFO("
<< std::hex << std::setw(4) << std::setfill('0') << get_client() <<")";
break;
case handler_type_e::WATCHDOG:
- VSOMEIP_INFO << "BLOCKING CALL WATCHDOG("
+ VSOMEIP_WARNING << "BLOCKING CALL WATCHDOG("
<< std::hex << std::setw(4) << std::setfill('0') << get_client() <<")";
break;
case handler_type_e::UNKNOWN:
- VSOMEIP_INFO << "BLOCKING CALL UNKNOWN("
+ VSOMEIP_WARNING << "BLOCKING CALL UNKNOWN("
<< std::hex << std::setw(4) << std::setfill('0') << get_client() << ")";
break;
}
diff --git a/implementation/service_discovery/include/service_discovery_impl.hpp b/implementation/service_discovery/include/service_discovery_impl.hpp
index 020de59..039fe7b 100644
--- a/implementation/service_discovery/include/service_discovery_impl.hpp
+++ b/implementation/service_discovery/include/service_discovery_impl.hpp
@@ -122,12 +122,20 @@ private:
service_t _service, instance_t _instance,
const std::shared_ptr<const serviceinfo> &_info,
uint32_t &_size);
- void insert_subscription(std::shared_ptr<message_impl> &_message,
+ enum remote_offer_type_e : std::uint8_t {
+ RELIABLE_UNRELIABLE,
+ RELIABLE,
+ UNRELIABLE,
+ UNKNOWN = 0xff
+ };
+ bool insert_subscription(std::shared_ptr<message_impl> &_message,
service_t _service, instance_t _instance, eventgroup_t _eventgroup,
- std::shared_ptr<subscription> &_subscription, bool _insert_reliable, bool _insert_unreliable);
- void insert_nack_subscription_on_resubscribe(std::shared_ptr<message_impl> &_message,
+ std::shared_ptr<subscription> &_subscription,
+ remote_offer_type_e _offer_type);
+ bool insert_nack_subscription_on_resubscribe(std::shared_ptr<message_impl> &_message,
service_t _service, instance_t _instance, eventgroup_t _eventgroup,
- std::shared_ptr<subscription> &_subscription);
+ std::shared_ptr<subscription> &_subscription,
+ remote_offer_type_e _offer_type);
void insert_subscription_ack(std::shared_ptr<message_impl> &_message,
service_t _service, instance_t _instance, eventgroup_t _eventgroup,
const std::shared_ptr<eventgroupinfo> &_info, ttl_t _ttl,
@@ -206,8 +214,7 @@ private:
const std::shared_ptr<const message> &_message) const;
bool check_layer_four_protocol(
const std::shared_ptr<const ip_option_impl> _ip_option) const;
- void get_subscription_endpoints(subscription_type_e _subscription_type,
- std::shared_ptr<endpoint>& _unreliable,
+ void get_subscription_endpoints(std::shared_ptr<endpoint>& _unreliable,
std::shared_ptr<endpoint>& _reliable,
boost::asio::ip::address* _address,
bool* _has_address,
@@ -322,6 +329,16 @@ private:
void on_last_msg_received_timer_expired(const boost::system::error_code &_error);
void stop_last_msg_received_timer();
+
+ remote_offer_type_e get_remote_offer_type(service_t _service, instance_t _instance);
+ bool update_remote_offer_type(service_t _service, instance_t _instance,
+ remote_offer_type_e _offer_type,
+ const boost::asio::ip::address &_reliable_address,
+ const boost::asio::ip::address &_unreliable_address);
+ void remove_remote_offer_type(service_t _service, instance_t _instance,
+ const boost::asio::ip::address &_address);
+ void remove_remote_offer_type_by_ip(const boost::asio::ip::address &_address);
+
private:
boost::asio::io_service &io_;
service_discovery_host *host_;
@@ -417,6 +434,10 @@ private:
std::mutex last_msg_received_timer_mutex_;
boost::asio::steady_timer last_msg_received_timer_;
std::chrono::milliseconds last_msg_received_timer_timeout_;
+
+ std::mutex remote_offer_types_mutex_;
+ std::map<std::pair<service_t, instance_t>, remote_offer_type_e> remote_offer_types_;
+ std::map<boost::asio::ip::address, std::set<std::pair<service_t, instance_t>>> remote_offers_by_ip_;
};
} // namespace sd
diff --git a/implementation/service_discovery/src/service_discovery_impl.cpp b/implementation/service_discovery/src/service_discovery_impl.cpp
index d617e0c..2d5aaff 100644
--- a/implementation/service_discovery/src/service_discovery_impl.cpp
+++ b/implementation/service_discovery/src/service_discovery_impl.cpp
@@ -258,7 +258,7 @@ void service_discovery_impl::subscribe(service_t _service, instance_t _instance,
bool has_address(false);
boost::asio::ip::address its_address;
- get_subscription_endpoints(_subscription_type, its_unreliable, its_reliable,
+ get_subscription_endpoints(its_unreliable, its_reliable,
&its_address, &has_address, _service, _instance, _client);
std::shared_ptr<runtime> its_runtime = runtime_.lock();
@@ -286,41 +286,47 @@ void service_discovery_impl::subscribe(service_t _service, instance_t _instance,
}
}
- if (its_subscription->get_endpoint(true) &&
+ const remote_offer_type_e its_offer_type = get_remote_offer_type(
+ _service, _instance);
+
+ if (its_offer_type == remote_offer_type_e::UNRELIABLE &&
+ !its_subscription->get_endpoint(true) &&
its_subscription->get_endpoint(false)) {
- if (its_subscription->get_endpoint(true)->is_connected() &&
- its_subscription->get_endpoint(false)->is_connected()) {
+ if (its_subscription->get_endpoint(false)->is_connected()) {
insert_subscription(its_message,
_service, _instance,
_eventgroup,
- its_subscription, true, true);
+ its_subscription, its_offer_type);
} else {
- if (!its_subscription->get_endpoint(true)->is_connected()) {
- its_subscription->set_tcp_connection_established(false);
- }
- if (!its_subscription->get_endpoint(false)->is_connected()) {
- its_subscription->set_udp_connection_established(false);
- }
+ its_subscription->set_udp_connection_established(false);
}
- } else if (its_subscription->get_endpoint(true) &&
+ } else if (its_offer_type == remote_offer_type_e::RELIABLE &&
+ its_subscription->get_endpoint(true) &&
!its_subscription->get_endpoint(false)) {
if (its_subscription->get_endpoint(true)->is_connected()) {
insert_subscription(its_message,
_service, _instance,
_eventgroup,
- its_subscription, true, false);
+ its_subscription, its_offer_type);
} else {
its_subscription->set_tcp_connection_established(false);
}
- } else if (!its_subscription->get_endpoint(true) &&
+ } else if (its_offer_type == remote_offer_type_e::RELIABLE_UNRELIABLE &&
+ its_subscription->get_endpoint(true) &&
its_subscription->get_endpoint(false)) {
- if (its_subscription->get_endpoint(false)->is_connected()) {
+ if (its_subscription->get_endpoint(true)->is_connected() &&
+ its_subscription->get_endpoint(false)->is_connected()) {
insert_subscription(its_message,
_service, _instance,
_eventgroup,
- its_subscription, false, true);
+ its_subscription, its_offer_type);
} else {
- its_subscription->set_udp_connection_established(false);
+ if (!its_subscription->get_endpoint(true)->is_connected()) {
+ its_subscription->set_tcp_connection_established(false);
+ }
+ if (!its_subscription->get_endpoint(false)->is_connected()) {
+ its_subscription->set_udp_connection_established(false);
+ }
}
}
@@ -336,107 +342,30 @@ void service_discovery_impl::subscribe(service_t _service, instance_t _instance,
}
void service_discovery_impl::get_subscription_endpoints(
- subscription_type_e _subscription_type,
std::shared_ptr<endpoint>& _unreliable,
std::shared_ptr<endpoint>& _reliable, boost::asio::ip::address* _address,
bool* _has_address,
service_t _service, instance_t _instance, client_t _client) const {
- switch (_subscription_type) {
- case subscription_type_e::SU_RELIABLE_AND_UNRELIABLE:
- _reliable = host_->find_or_create_remote_client(_service, _instance,
- true, _client);
- _unreliable = host_->find_or_create_remote_client(_service,
- _instance, false, _client);
- if (_unreliable) {
- std::shared_ptr<client_endpoint> its_client_endpoint =
- std::dynamic_pointer_cast<client_endpoint>(_unreliable);
- if (its_client_endpoint) {
- *_has_address = its_client_endpoint->get_remote_address(
+ _reliable = host_->find_or_create_remote_client(_service, _instance,
+ true, _client);
+ _unreliable = host_->find_or_create_remote_client(_service,
+ _instance, false, _client);
+ if (_unreliable) {
+ std::shared_ptr<client_endpoint> its_client_endpoint =
+ std::dynamic_pointer_cast<client_endpoint>(_unreliable);
+ if (its_client_endpoint) {
+ *_has_address = its_client_endpoint->get_remote_address(
+ *_address);
+ }
+ }
+ if (_reliable) {
+ std::shared_ptr<client_endpoint> its_client_endpoint =
+ std::dynamic_pointer_cast<client_endpoint>(_reliable);
+ if (its_client_endpoint) {
+ *_has_address = *_has_address
+ || its_client_endpoint->get_remote_address(
*_address);
- }
- }
- if (_reliable) {
- std::shared_ptr<client_endpoint> its_client_endpoint =
- std::dynamic_pointer_cast<client_endpoint>(_reliable);
- if (its_client_endpoint) {
- *_has_address = *_has_address
- || its_client_endpoint->get_remote_address(
- *_address);
- }
- }
- break;
- case subscription_type_e::SU_PREFER_UNRELIABLE:
- _unreliable = host_->find_or_create_remote_client(_service,
- _instance, false, _client);
- if (_unreliable) {
- std::shared_ptr<client_endpoint> its_client_endpoint =
- std::dynamic_pointer_cast<client_endpoint>(_unreliable);
- if (its_client_endpoint) {
- *_has_address = its_client_endpoint->get_remote_address(
- *_address);
- }
- } else {
- _reliable = host_->find_or_create_remote_client(_service,
- _instance, true, _client);
- if (_reliable) {
- std::shared_ptr<client_endpoint> its_client_endpoint =
- std::dynamic_pointer_cast<client_endpoint>(
- _reliable);
- if (its_client_endpoint) {
- *_has_address = its_client_endpoint->get_remote_address(
- *_address);
- }
- }
- }
- break;
- case subscription_type_e::SU_PREFER_RELIABLE:
- _reliable = host_->find_or_create_remote_client(_service,
- _instance, true, _client);
- if (_reliable) {
- std::shared_ptr<client_endpoint> its_client_endpoint =
- std::dynamic_pointer_cast<client_endpoint>(_reliable);
- if (its_client_endpoint) {
- *_has_address = its_client_endpoint->get_remote_address(
- *_address);
- }
- } else {
- _unreliable = host_->find_or_create_remote_client(_service,
- _instance, false, _client);
- if (_unreliable) {
- std::shared_ptr<client_endpoint> its_client_endpoint =
- std::dynamic_pointer_cast<client_endpoint>(
- _unreliable);
- if (its_client_endpoint) {
- *_has_address = its_client_endpoint->get_remote_address(
- *_address);
- }
- }
- }
- break;
- case subscription_type_e::SU_UNRELIABLE:
- _unreliable = host_->find_or_create_remote_client(_service,
- _instance,
- false, _client);
- if (_unreliable) {
- std::shared_ptr<client_endpoint> its_client_endpoint =
- std::dynamic_pointer_cast<client_endpoint>(_unreliable);
- if (its_client_endpoint) {
- *_has_address = its_client_endpoint->get_remote_address(
- *_address);
- }
- }
- break;
- case subscription_type_e::SU_RELIABLE:
- _reliable = host_->find_or_create_remote_client(_service, _instance,
- true, _client);
- if (_reliable) {
- std::shared_ptr<client_endpoint> its_client_endpoint =
- std::dynamic_pointer_cast<client_endpoint>(_reliable);
- if (its_client_endpoint) {
- *_has_address = its_client_endpoint->get_remote_address(
- *_address);
- }
- }
+ }
}
}
@@ -456,6 +385,8 @@ void service_discovery_impl::unsubscribe(service_t _service,
if (found_service != subscribed_.end()) {
auto found_instance = found_service->second.find(_instance);
if (found_instance != found_service->second.end()) {
+ const remote_offer_type_e its_offer_type = get_remote_offer_type(
+ _service, _instance);
auto found_eventgroup = found_instance->second.find(_eventgroup);
if (found_eventgroup != found_instance->second.end()) {
auto found_client = found_eventgroup->second.find(_client);
@@ -488,8 +419,8 @@ void service_discovery_impl::unsubscribe(service_t _service,
return;
}
}
- insert_subscription(its_message, _service, _instance, _eventgroup,
- its_subscription, true, true);
+ insert_subscription(its_message, _service, _instance,
+ _eventgroup, its_subscription, its_offer_type);
}
}
}
@@ -535,6 +466,8 @@ void service_discovery_impl::unsubscribe_client(service_t _service,
if (found_service != subscribed_.end()) {
auto found_instance = found_service->second.find(_instance);
if (found_instance != found_service->second.end()) {
+ const remote_offer_type_e its_offer_type = get_remote_offer_type(
+ _service, _instance);
for (auto &found_eventgroup : found_instance->second) {
auto found_client = found_eventgroup.second.find(_client);
if (found_client != found_eventgroup.second.end()) {
@@ -570,8 +503,7 @@ void service_discovery_impl::unsubscribe_client(service_t _service,
}
}
insert_subscription(its_message, _service, _instance,
- found_eventgroup.first, its_subscription, true,
- true);
+ found_eventgroup.first, its_subscription, its_offer_type);
}
}
}
@@ -895,117 +827,207 @@ void service_discovery_impl::insert_offer_entries(
_done = true;
}
-void service_discovery_impl::insert_subscription(
+bool service_discovery_impl::insert_subscription(
std::shared_ptr<message_impl> &_message, service_t _service,
instance_t _instance, eventgroup_t _eventgroup,
std::shared_ptr<subscription> &_subscription,
- bool _insert_reliable, bool _insert_unreliable) {
- if((_insert_reliable && !_insert_unreliable && !_subscription->get_endpoint(true)) ||
- (_insert_unreliable && !_insert_reliable && !_subscription->get_endpoint(false))) {
- // don't create an eventgroup entry if there isn't an endpoint option
- // to insert
- return;
- }
- std::shared_ptr < eventgroupentry_impl > its_entry =
- _message->create_eventgroup_entry();
- its_entry->set_type(entry_type_e::SUBSCRIBE_EVENTGROUP);
- its_entry->set_service(_service);
- its_entry->set_instance(_instance);
- its_entry->set_eventgroup(_eventgroup);
- its_entry->set_counter(_subscription->get_counter());
- its_entry->set_major_version(_subscription->get_major());
- its_entry->set_ttl(_subscription->get_ttl());
- std::shared_ptr < endpoint > its_endpoint;
- if (_insert_reliable) {
- its_endpoint = _subscription->get_endpoint(true);
- if (its_endpoint) {
- const std::uint16_t its_port = its_endpoint->get_local_port();
- if (its_port) {
- insert_option(_message, its_entry, unicast_, its_port, true);
- } else {
- VSOMEIP_WARNING << __func__ << ": Didn't insert subscription as "
- "local reliable port is zero: ["
- << std::hex << std::setw(4) << std::setfill('0') << _service << "."
- << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
- << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]";
+ remote_offer_type_e _offer_type) {
+ bool ret(false);
+ std::shared_ptr<endpoint> its_reliable_endpoint(_subscription->get_endpoint(true));
+ std::shared_ptr<endpoint> its_unreliable_endpoint(_subscription->get_endpoint(false));
+
+ bool insert_reliable(false);
+ bool insert_unreliable(false);
+ switch (_offer_type) {
+ case remote_offer_type_e::RELIABLE:
+ if (its_reliable_endpoint) {
+ insert_reliable = true;
}
- }
- }
- if (_insert_unreliable) {
- its_endpoint = _subscription->get_endpoint(false);
- if (its_endpoint) {
- const std::uint16_t its_port = its_endpoint->get_local_port();
- if (its_port) {
- insert_option(_message, its_entry, unicast_, its_port, false);
- } else {
- VSOMEIP_WARNING << __func__ << ": Didn't insert subscription as "
- " local unreliable port is zero: ["
- << std::hex << std::setw(4) << std::setfill('0') << _service << "."
- << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
- << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]";
+ break;
+ case remote_offer_type_e::UNRELIABLE:
+ if (its_unreliable_endpoint) {
+ insert_unreliable = true;
}
- }
+ break;
+ case remote_offer_type_e::RELIABLE_UNRELIABLE:
+ if (its_reliable_endpoint && its_unreliable_endpoint) {
+ insert_reliable = true;
+ insert_unreliable = true;
+ }
+ break;
+ default:
+ break;
}
-}
-
-void service_discovery_impl::insert_nack_subscription_on_resubscribe(std::shared_ptr<message_impl> &_message,
- service_t _service, instance_t _instance, eventgroup_t _eventgroup,
- std::shared_ptr<subscription> &_subscription) {
- // SIP_SD_844:
- // This method is used for not acknowledged subscriptions on renew subscription
- // Two entries: Stop subscribe & subscribe within one SD-Message
- // One option: Both entries reference it
-
- std::shared_ptr < eventgroupentry_impl > its_stop_entry =
- _message->create_eventgroup_entry();
- its_stop_entry->set_type(entry_type_e::SUBSCRIBE_EVENTGROUP);
- its_stop_entry->set_service(_service);
- its_stop_entry->set_instance(_instance);
- its_stop_entry->set_eventgroup(_eventgroup);
- its_stop_entry->set_counter(_subscription->get_counter());
- its_stop_entry->set_major_version(_subscription->get_major());
- its_stop_entry->set_ttl(0);
-
- std::shared_ptr < eventgroupentry_impl > its_entry =
- _message->create_eventgroup_entry();
- its_entry->set_type(entry_type_e::SUBSCRIBE_EVENTGROUP);
- its_entry->set_service(_service);
- its_entry->set_instance(_instance);
- its_entry->set_eventgroup(_eventgroup);
- its_entry->set_counter(_subscription->get_counter());
- its_entry->set_major_version(_subscription->get_major());
- its_entry->set_ttl(_subscription->get_ttl());
-
- std::shared_ptr < endpoint > its_endpoint;
- its_endpoint = _subscription->get_endpoint(true);
- if (its_endpoint && its_endpoint->is_connected()) {
- const std::uint16_t its_port = its_endpoint->get_local_port();
+ if (!insert_reliable && !insert_unreliable) {
+ VSOMEIP_WARNING << __func__ << ": Didn't insert subscription as "
+ "subscription doesn't match offer type: ["
+ << std::hex << std::setw(4) << std::setfill('0') << _service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "] "
+ << _offer_type;
+ return false;
+ }
+ std::shared_ptr<eventgroupentry_impl> its_entry;
+ if (insert_reliable && its_reliable_endpoint) {
+ const std::uint16_t its_port = its_reliable_endpoint->get_local_port();
if (its_port) {
- insert_option(_message, its_stop_entry, unicast_, its_port, true);
+ its_entry = _message->create_eventgroup_entry();
+ its_entry->set_type(entry_type_e::SUBSCRIBE_EVENTGROUP);
+ its_entry->set_service(_service);
+ its_entry->set_instance(_instance);
+ its_entry->set_eventgroup(_eventgroup);
+ its_entry->set_counter(_subscription->get_counter());
+ its_entry->set_major_version(_subscription->get_major());
+ its_entry->set_ttl(_subscription->get_ttl());
insert_option(_message, its_entry, unicast_, its_port, true);
+ ret = true;
} else {
VSOMEIP_WARNING << __func__ << ": Didn't insert subscription as "
"local reliable port is zero: ["
<< std::hex << std::setw(4) << std::setfill('0') << _service << "."
<< std::hex << std::setw(4) << std::setfill('0') << _instance << "."
<< std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]";
+ ret = false;
}
}
- its_endpoint = _subscription->get_endpoint(false);
- if (its_endpoint) {
- const std::uint16_t its_port = its_endpoint->get_local_port();
+ if (insert_unreliable && its_unreliable_endpoint) {
+ const std::uint16_t its_port = its_unreliable_endpoint->get_local_port();
if (its_port) {
- insert_option(_message, its_stop_entry, unicast_, its_port, false);
+ if (!its_entry) {
+ its_entry = _message->create_eventgroup_entry();
+ its_entry->set_type(entry_type_e::SUBSCRIBE_EVENTGROUP);
+ its_entry->set_service(_service);
+ its_entry->set_instance(_instance);
+ its_entry->set_eventgroup(_eventgroup);
+ its_entry->set_counter(_subscription->get_counter());
+ its_entry->set_major_version(_subscription->get_major());
+ its_entry->set_ttl(_subscription->get_ttl());
+ }
insert_option(_message, its_entry, unicast_, its_port, false);
+ ret = true;
} else {
VSOMEIP_WARNING << __func__ << ": Didn't insert subscription as "
- "local unreliable port is zero: ["
+ " local unreliable port is zero: ["
<< std::hex << std::setw(4) << std::setfill('0') << _service << "."
<< std::hex << std::setw(4) << std::setfill('0') << _instance << "."
<< std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]";
+ ret = false;
}
}
+ return ret;
+}
+
+bool service_discovery_impl::insert_nack_subscription_on_resubscribe(std::shared_ptr<message_impl> &_message,
+ service_t _service, instance_t _instance, eventgroup_t _eventgroup,
+ std::shared_ptr<subscription> &_subscription, remote_offer_type_e _offer_type) {
+ bool ret(false);
+ // SIP_SD_844:
+ // This method is used for not acknowledged subscriptions on renew subscription
+ // Two entries: Stop subscribe & subscribe within one SD-Message
+ // One option: Both entries reference it
+
+ const std::function<std::shared_ptr<eventgroupentry_impl>(ttl_t)> insert_entry
+ = [&](ttl_t _ttl) {
+ std::shared_ptr<eventgroupentry_impl> its_entry =
+ _message->create_eventgroup_entry();
+ // SUBSCRIBE_EVENTGROUP and STOP_SUBSCRIBE_EVENTGROUP are identical
+ its_entry->set_type(entry_type_e::SUBSCRIBE_EVENTGROUP);
+ its_entry->set_service(_service);
+ its_entry->set_instance(_instance);
+ its_entry->set_eventgroup(_eventgroup);
+ its_entry->set_counter(_subscription->get_counter());
+ its_entry->set_major_version(_subscription->get_major());
+ its_entry->set_ttl(_ttl);
+ return its_entry;
+ };
+
+ std::shared_ptr<endpoint> its_reliable_endpoint(_subscription->get_endpoint(true));
+ std::shared_ptr<endpoint> its_unreliable_endpoint(_subscription->get_endpoint(false));
+
+ if (_offer_type == remote_offer_type_e::UNRELIABLE &&
+ !its_reliable_endpoint && its_unreliable_endpoint) {
+ if (its_unreliable_endpoint->is_connected()) {
+ const std::uint16_t its_port = its_unreliable_endpoint->get_local_port();
+ if (its_port) {
+ std::shared_ptr<eventgroupentry_impl> its_stop_entry = insert_entry(0);
+ std::shared_ptr<eventgroupentry_impl> its_entry = insert_entry(_subscription->get_ttl());
+ insert_option(_message, its_stop_entry, unicast_, its_port, false);
+ insert_option(_message, its_entry, unicast_, its_port, false);
+ ret = true;
+ } else {
+ VSOMEIP_WARNING << __func__ << ": Didn't insert subscription as "
+ "local unreliable port is zero: ["
+ << std::hex << std::setw(4) << std::setfill('0') << _service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]";
+ }
+ } else {
+ _subscription->set_udp_connection_established(false);
+ }
+ } else if (_offer_type == remote_offer_type_e::RELIABLE &&
+ its_reliable_endpoint && !its_unreliable_endpoint) {
+ if (its_reliable_endpoint->is_connected()) {
+ const std::uint16_t its_port = its_reliable_endpoint->get_local_port();
+ if (its_port) {
+ std::shared_ptr<eventgroupentry_impl> its_stop_entry = insert_entry(0);
+ std::shared_ptr<eventgroupentry_impl> its_entry = insert_entry(_subscription->get_ttl());
+ insert_option(_message, its_stop_entry, unicast_, its_port, true);
+ insert_option(_message, its_entry, unicast_, its_port, true);
+ ret = true;
+ } else {
+ VSOMEIP_WARNING << __func__ << ": Didn't insert subscription as "
+ "local reliable port is zero: ["
+ << std::hex << std::setw(4) << std::setfill('0') << _service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]";
+ }
+ } else {
+ _subscription->set_tcp_connection_established(false);
+ }
+ } else if (_offer_type == remote_offer_type_e::RELIABLE_UNRELIABLE &&
+ its_reliable_endpoint && its_unreliable_endpoint) {
+ if (its_reliable_endpoint->is_connected() &&
+ its_unreliable_endpoint->is_connected()) {
+ const std::uint16_t its_reliable_port = its_reliable_endpoint->get_local_port();
+ const std::uint16_t its_unreliable_port = its_unreliable_endpoint->get_local_port();
+ if (its_reliable_port && its_unreliable_port) {
+ std::shared_ptr<eventgroupentry_impl> its_stop_entry = insert_entry(0);
+ std::shared_ptr<eventgroupentry_impl> its_entry = insert_entry(_subscription->get_ttl());
+ insert_option(_message, its_stop_entry, unicast_, its_reliable_port, true);
+ insert_option(_message, its_entry, unicast_, its_reliable_port, true);
+ insert_option(_message, its_stop_entry, unicast_, its_unreliable_port, false);
+ insert_option(_message, its_entry, unicast_, its_unreliable_port, false);
+ ret = true;
+ } else if (!its_reliable_port) {
+ VSOMEIP_WARNING << __func__ << ": Didn't insert subscription as "
+ "local reliable port is zero: ["
+ << std::hex << std::setw(4) << std::setfill('0') << _service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]";
+ } else if (!its_unreliable_port) {
+ VSOMEIP_WARNING << __func__ << ": Didn't insert subscription as "
+ "local unreliable port is zero: ["
+ << std::hex << std::setw(4) << std::setfill('0') << _service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]";
+ }
+ } else {
+ if (!its_reliable_endpoint->is_connected()) {
+ _subscription->set_tcp_connection_established(false);
+ }
+ if (!its_unreliable_endpoint->is_connected()) {
+ _subscription->set_udp_connection_established(false);
+ }
+ }
+ } else {
+ VSOMEIP_WARNING << __func__ << ": Couldn't insert StopSubscribe/Subscribe ["
+ << std::hex << std::setw(4) << std::setfill('0') << _service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "] "
+ << _offer_type;
+ }
+ return ret;
}
void service_discovery_impl::insert_subscription_ack(
@@ -1151,6 +1173,7 @@ void service_discovery_impl::on_message(const byte_t *_data, length_t _length,
if (is_reboot(_sender, _destination,
its_message->get_reboot_flag(), its_message->get_session())) {
VSOMEIP_INFO << "Reboot detected: IP=" << _sender.to_string();
+ remove_remote_offer_type_by_ip(_sender);
host_->expire_subscriptions(_sender);
host_->expire_services(_sender);
}
@@ -1341,6 +1364,9 @@ void service_discovery_impl::process_serviceentry(
// ID: SIP_SD_830
its_request->set_sent_counter(std::uint8_t(repetitions_max_ + 1));
}
+ remove_remote_offer_type(its_service, its_instance,
+ (its_reliable_port != ILLEGAL_PORT ?
+ its_reliable_address : its_unreliable_address));
unsubscribe_all(its_service, its_instance);
if (!is_diagnosis_ && !is_suspended_) {
host_->del_routing_info(its_service, its_instance,
@@ -1367,6 +1393,31 @@ void service_discovery_impl::process_offerservice_serviceentry(
std::lock_guard<std::mutex> its_lock(requested_mutex_);
its_request->set_sent_counter(std::uint8_t(repetitions_max_ + 1));
}
+ remote_offer_type_e offer_type(remote_offer_type_e::UNKNOWN);
+ if (_reliable_port != ILLEGAL_PORT
+ && _unreliable_port != ILLEGAL_PORT
+ && !_reliable_address.is_unspecified()
+ && !_unreliable_address.is_unspecified()) {
+ offer_type = remote_offer_type_e::RELIABLE_UNRELIABLE;
+ } else if (_unreliable_port != ILLEGAL_PORT
+ && !_unreliable_address.is_unspecified()) {
+ offer_type = remote_offer_type_e::UNRELIABLE;
+ } else if (_reliable_port != ILLEGAL_PORT
+ && !_reliable_address.is_unspecified()) {
+ offer_type = remote_offer_type_e::RELIABLE;
+ } else {
+ VSOMEIP_WARNING << __func__ << ": unknown remote offer type ["
+ << std::hex << std::setw(4) << std::setfill('0') << _service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _instance << "]";
+ }
+
+ if (update_remote_offer_type(_service,_instance, offer_type,
+ _reliable_address, _unreliable_address)) {
+ VSOMEIP_WARNING << __func__ << ": Remote offer type changed ["
+ << std::hex << std::setw(4) << std::setfill('0') << _service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _instance << "]";
+ }
+
host_->add_routing_info(_service, _instance,
_major, _minor,
@@ -1392,6 +1443,8 @@ void service_discovery_impl::process_offerservice_serviceentry(
auto found_instance = found_service->second.find(_instance);
if (found_instance != found_service->second.end()) {
if (0 < found_instance->second.size()) {
+ const remote_offer_type_e its_offer_type =
+ get_remote_offer_type(_service, _instance);
for (auto its_eventgroup : found_instance->second) {
for (auto its_client : its_eventgroup.second) {
std::shared_ptr<subscription> its_subscription(its_client.second);
@@ -1400,7 +1453,6 @@ void service_discovery_impl::process_offerservice_serviceentry(
bool has_address(false);
boost::asio::ip::address its_address;
get_subscription_endpoints(
- its_client.second->get_subscription_type(),
its_unreliable, its_reliable, &its_address,
&has_address, _service, _instance,
its_client.first);
@@ -1420,66 +1472,113 @@ void service_discovery_impl::process_offerservice_serviceentry(
}
if (its_subscription->is_acknowledged()) {
- if (its_subscription->get_endpoint(true)
- && its_subscription->get_endpoint(true)->is_connected()) {
- // 40 = 16 (subscription) + 2x12 (option)
- check_space(40);
- const std::size_t options_size_before =
- _resubscribes->back().second->get_options().size();
- insert_subscription(_resubscribes->back().second,
- _service, _instance,
- its_eventgroup.first,
- its_subscription, true, true);
- const std::size_t options_size_after =
- _resubscribes->back().second->get_options().size();
- const std::size_t diff = options_size_after - options_size_before;
- _resubscribes->back().first =
- static_cast<std::uint16_t>(
- _resubscribes->back().first + (12u * diff - 24u));
- } else {
- // don't insert reliable endpoint option if the
- // TCP client endpoint is not yet connected
+ if (its_offer_type == remote_offer_type_e::UNRELIABLE &&
+ its_unreliable && its_unreliable->is_connected()) {
// 28 = 16 (subscription) + 12 (option)
check_space(28);
const std::size_t options_size_before =
_resubscribes->back().second->get_options().size();
- insert_subscription(_resubscribes->back().second,
+ if (insert_subscription(_resubscribes->back().second,
_service, _instance,
its_eventgroup.first,
- its_subscription, false, true);
- its_client.second->set_tcp_connection_established(false);
- const std::size_t options_size_after =
- _resubscribes->back().second->get_options().size();
- const std::size_t diff = options_size_after - options_size_before;
- _resubscribes->back().first =
- static_cast<std::uint16_t>(
- _resubscribes->back().first + (12u * diff - 12u));
- // restart TCP endpoint if not connected
- if (its_subscription->get_endpoint(true)
- && !its_subscription->get_endpoint(true)->is_connected()) {
- its_subscription->get_endpoint(true)->restart();
+ its_subscription, its_offer_type)) {
+ its_subscription->set_acknowledged(false);
+ const std::size_t options_size_after =
+ _resubscribes->back().second->get_options().size();
+ const std::size_t diff = options_size_after - options_size_before;
+ _resubscribes->back().first =
+ static_cast<std::uint16_t>(
+ _resubscribes->back().first + (12u * diff - 12u));
+ } else {
+ _resubscribes->back().first =
+ static_cast<std::uint16_t>(
+ _resubscribes->back().first - 28);
+ }
+ } else if (its_offer_type == remote_offer_type_e::RELIABLE &&
+ its_reliable) {
+ if (its_reliable->is_connected()) {
+ // 28 = 16 (subscription) + 12 (option)
+ check_space(28);
+ const std::size_t options_size_before =
+ _resubscribes->back().second->get_options().size();
+ if (insert_subscription(_resubscribes->back().second,
+ _service, _instance,
+ its_eventgroup.first,
+ its_subscription, its_offer_type)) {
+ its_subscription->set_acknowledged(false);
+ const std::size_t options_size_after =
+ _resubscribes->back().second->get_options().size();
+ const std::size_t diff = options_size_after - options_size_before;
+ _resubscribes->back().first =
+ static_cast<std::uint16_t>(
+ _resubscribes->back().first + (12u * diff - 12u));
+ } else {
+ _resubscribes->back().first =
+ static_cast<std::uint16_t>(
+ _resubscribes->back().first - 28);
+ }
+ } else {
+ its_client.second->set_tcp_connection_established(false);
+ // restart TCP endpoint if not connected
+ its_reliable->restart();
+ }
+ } else if (its_offer_type == remote_offer_type_e::RELIABLE_UNRELIABLE) {
+ if (its_reliable && its_unreliable &&
+ its_reliable->is_connected() &&
+ its_unreliable->is_connected()) {
+ // 40 = 16 (subscription) + 2x12 (option)
+ check_space(40);
+ const std::size_t options_size_before =
+ _resubscribes->back().second->get_options().size();
+ if (insert_subscription(_resubscribes->back().second,
+ _service, _instance,
+ its_eventgroup.first,
+ its_subscription, its_offer_type)) {
+ its_subscription->set_acknowledged(false);
+ const std::size_t options_size_after =
+ _resubscribes->back().second->get_options().size();
+ const std::size_t diff = options_size_after - options_size_before;
+ _resubscribes->back().first =
+ static_cast<std::uint16_t>(
+ _resubscribes->back().first + (12u * diff - 24u));
+ } else {
+ _resubscribes->back().first =
+ static_cast<std::uint16_t>(
+ _resubscribes->back().first - 40);
+ }
+ } else if (its_reliable && !its_reliable->is_connected()) {
+ its_client.second->set_tcp_connection_established(false);
+ // restart TCP endpoint if not connected
+ its_reliable->restart();
}
+ } else {
+ VSOMEIP_WARNING << __func__ << ": unknown remote offer type ["
+ << std::hex << std::setw(4) << std::setfill('0') << _service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _instance << "]";
}
- its_subscription->set_acknowledged(false);
} else {
// 56 = 2x16 (subscription) + 2x12 (option)
check_space(56);
const std::size_t options_size_before =
_resubscribes->back().second->get_options().size();
- insert_nack_subscription_on_resubscribe(_resubscribes->back().second,
+ if (insert_nack_subscription_on_resubscribe(_resubscribes->back().second,
_service, _instance, its_eventgroup.first,
- its_subscription);
- const std::size_t options_size_after =
- _resubscribes->back().second->get_options().size();
- const std::size_t diff = options_size_after - options_size_before;
- _resubscribes->back().first =
- static_cast<std::uint16_t>(
- _resubscribes->back().first + (12u * diff - 24u));
+ its_subscription, its_offer_type) ) {
+ const std::size_t options_size_after =
+ _resubscribes->back().second->get_options().size();
+ const std::size_t diff = options_size_after - options_size_before;
+ _resubscribes->back().first =
+ static_cast<std::uint16_t>(
+ _resubscribes->back().first + (12u * diff - 24u));
+ } else {
+ _resubscribes->back().first =
+ static_cast<std::uint16_t>(
+ _resubscribes->back().first - 56u);
+ }
// restart TCP endpoint if not connected
- if (its_subscription->get_endpoint(true)
- && !its_subscription->get_endpoint(true)->is_connected()) {
- its_subscription->get_endpoint(true)->restart();
+ if (its_reliable && !its_reliable->is_connected()) {
+ its_reliable->restart();
}
}
}
@@ -1592,7 +1691,8 @@ void service_discovery_impl::on_endpoint_connected(
auto found_instance = found_service->second.find(_instance);
if (found_instance != found_service->second.end()) {
if(0 < found_instance->second.size()) {
-
+ const remote_offer_type_e its_offer_type =
+ get_remote_offer_type(_service, _instance);
for(const auto &its_eventgroup : found_instance->second) {
for(const auto &its_client : its_eventgroup.second) {
if (its_client.first != VSOMEIP_ROUTING_CLIENT) {
@@ -1637,7 +1737,6 @@ void service_discovery_impl::on_endpoint_connected(
std::shared_ptr<endpoint> its_unreliable;
std::shared_ptr<endpoint> its_reliable;
get_subscription_endpoints(
- its_subscription->get_subscription_type(),
its_unreliable, its_reliable, &its_address,
&has_address, _service, _instance,
its_client.first);
@@ -1645,7 +1744,7 @@ void service_discovery_impl::on_endpoint_connected(
its_subscription->set_endpoint(its_unreliable, false);
insert_subscription(its_message, _service,
_instance, its_eventgroup.first,
- its_subscription, true, true);
+ its_subscription, its_offer_type);
its_subscription->set_acknowledged(false);
}
}
@@ -2482,6 +2581,8 @@ void service_discovery_impl::send_subscriptions(service_t _service, instance_t _
if (found_service != subscribed_.end()) {
auto found_instance = found_service->second.find(_instance);
if (found_instance != found_service->second.end()) {
+ const remote_offer_type_e its_offer_type =
+ get_remote_offer_type(_service, _instance);
for (auto found_eventgroup : found_instance->second) {
auto found_client = found_eventgroup.second.find(_client);
if (found_client != found_eventgroup.second.end()) {
@@ -2490,7 +2591,6 @@ void service_discovery_impl::send_subscriptions(service_t _service, instance_t _
bool has_address(false);
boost::asio::ip::address its_address;
get_subscription_endpoints(
- found_client->second->get_subscription_type(),
its_unreliable, its_reliable, &its_address,
&has_address, _service, _instance,
found_client->first);
@@ -2526,7 +2626,7 @@ void service_discovery_impl::send_subscriptions(service_t _service, instance_t _
if (its_reliable->is_connected() && its_unreliable->is_connected()) {
insert_subscription(its_message, _service,
_instance, found_eventgroup.first,
- found_client->second, true, true);
+ found_client->second, its_offer_type);
found_client->second->set_tcp_connection_established(true);
found_client->second->set_udp_connection_established(true);
} else {
@@ -2538,7 +2638,7 @@ void service_discovery_impl::send_subscriptions(service_t _service, instance_t _
if(endpoint->is_connected()) {
insert_subscription(its_message, _service,
_instance, found_eventgroup.first,
- found_client->second, _reliable, !_reliable);
+ found_client->second, its_offer_type);
found_client->second->set_tcp_connection_established(true);
} else {
// don't insert reliable endpoint option if the
@@ -2549,7 +2649,7 @@ void service_discovery_impl::send_subscriptions(service_t _service, instance_t _
if (endpoint->is_connected()) {
insert_subscription(its_message, _service,
_instance, found_eventgroup.first,
- found_client->second, _reliable, !_reliable);
+ found_client->second, its_offer_type);
found_client->second->set_udp_connection_established(true);
} else {
// don't insert unreliable endpoint option if the
@@ -3517,5 +3617,78 @@ void service_discovery_impl::stop_last_msg_received_timer() {
last_msg_received_timer_.cancel(ec);
}
+service_discovery_impl::remote_offer_type_e service_discovery_impl::get_remote_offer_type(
+ service_t _service, instance_t _instance) {
+ std::lock_guard<std::mutex> its_lock(remote_offer_types_mutex_);
+ auto found_si = remote_offer_types_.find(std::make_pair(_service, _instance));
+ if (found_si != remote_offer_types_.end()) {
+ return found_si->second;
+ }
+ return remote_offer_type_e::UNKNOWN;
+}
+
+bool service_discovery_impl::update_remote_offer_type(
+ service_t _service, instance_t _instance,
+ remote_offer_type_e _offer_type,
+ const boost::asio::ip::address &_reliable_address,
+ const boost::asio::ip::address &_unreliable_address) {
+ bool ret(false);
+ std::lock_guard<std::mutex> its_lock(remote_offer_types_mutex_);
+ const std::pair<service_t, instance_t> its_si_pair = std::make_pair(_service, _instance);
+ auto found_si = remote_offer_types_.find(its_si_pair);
+ if (found_si != remote_offer_types_.end()) {
+ if (found_si->second != _offer_type ) {
+ found_si->second = _offer_type;
+ ret = true;
+ }
+ } else {
+ remote_offer_types_[its_si_pair] = _offer_type;
+ }
+ switch (_offer_type) {
+ case remote_offer_type_e::UNRELIABLE:
+ remote_offers_by_ip_[_unreliable_address].insert(its_si_pair);
+ break;
+ case remote_offer_type_e::RELIABLE:
+ remote_offers_by_ip_[_reliable_address].insert(its_si_pair);
+ break;
+ case remote_offer_type_e::RELIABLE_UNRELIABLE:
+ remote_offers_by_ip_[_unreliable_address].insert(its_si_pair);
+ break;
+ case remote_offer_type_e::UNKNOWN:
+ default:
+ VSOMEIP_WARNING << __func__ << ": unkown offer type ["
+ << std::hex << std::setw(4) << std::setfill('0') << _service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _instance << "]"
+ << _offer_type;
+ break;
+ }
+ return ret;
+}
+
+void service_discovery_impl::remove_remote_offer_type(
+ service_t _service, instance_t _instance,
+ const boost::asio::ip::address &_address) {
+ std::lock_guard<std::mutex> its_lock(remote_offer_types_mutex_);
+ const std::pair<service_t, instance_t> its_si_pair =
+ std::make_pair(_service, _instance);
+ remote_offer_types_.erase(its_si_pair);
+ auto found_services = remote_offers_by_ip_.find(_address);
+ if (found_services != remote_offers_by_ip_.end()) {
+ found_services->second.erase(its_si_pair);
+ }
+}
+
+void service_discovery_impl::remove_remote_offer_type_by_ip(
+ const boost::asio::ip::address &_address) {
+ std::lock_guard<std::mutex> its_lock(remote_offer_types_mutex_);
+ auto found_services = remote_offers_by_ip_.find(_address);
+ if (found_services != remote_offers_by_ip_.end()) {
+ for (const auto& si : found_services->second) {
+ remote_offer_types_.erase(si);
+ }
+ }
+ remote_offers_by_ip_.erase(_address);
+}
+
} // namespace sd
} // namespace vsomeip