diff options
36 files changed, 1113 insertions, 703 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index 1df6f11..880cccb 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -47,7 +47,7 @@ if (${CMAKE_SYSTEM_NAME} MATCHES "FreeBSD") set(DL_LIBRARY "")
set(EXPORTSYMBOLS "") set(NO_DEPRECATED "-Wno-deprecated")
- set(OPTIMIZE "-Os")
+ set(OPTIMIZE "")
endif (${CMAKE_SYSTEM_NAME} MATCHES "FreeBSD")
# Compiler settings
@@ -83,11 +83,17 @@ add_library(vsomeip-sd SHARED ${vsomeip-sd_SRC}) target_link_libraries(vsomeip-sd vsomeip ${Boost_LIBRARIES} rt ${DL_LIBRARY})
# Examples
-add_executable(client-sample examples/client-sample.cpp)
-target_link_libraries(client-sample vsomeip ${Boost_LIBRARIES} ${DL_LIBRARY})
+add_executable(request-sample examples/request-sample.cpp)
+target_link_libraries(request-sample vsomeip ${Boost_LIBRARIES} ${DL_LIBRARY})
-add_executable(service-sample examples/service-sample.cpp)
-target_link_libraries(service-sample vsomeip ${Boost_LIBRARIES} ${DL_LIBRARY})
+add_executable(response-sample examples/response-sample.cpp)
+target_link_libraries(response-sample vsomeip ${Boost_LIBRARIES} ${DL_LIBRARY})
+
+add_executable(subscribe-sample examples/subscribe-sample.cpp)
+target_link_libraries(subscribe-sample vsomeip ${Boost_LIBRARIES} ${DL_LIBRARY})
+
+add_executable(notify-sample examples/notify-sample.cpp)
+target_link_libraries(notify-sample vsomeip ${Boost_LIBRARIES} ${DL_LIBRARY})
# Test
add_executable(configuration-test test/configuration-test.cpp)
diff --git a/config/vsomeip-udp-client.json b/config/vsomeip-udp-client.json index 1cdc55d..fe50570 100644 --- a/config/vsomeip-udp-client.json +++ b/config/vsomeip-udp-client.json @@ -1,9 +1,9 @@ { - "unicast" : "192.168.56.104", + "unicast" : "192.168.56.101", "netmask" : "255.255.255.0", "logging" : { - "level" : "trace", + "level" : "debug", "console" : "true", "file" : { "enable" : "true", "path" : "/var/log/vsomeip.log" }, "dlt" : "true" @@ -31,7 +31,7 @@ [ { "name" : "remote", - "unicast" : "192.168.56.101", + "unicast" : "192.168.56.104", "services" : [ { @@ -62,12 +62,12 @@ [ { "eventgroup" : "0x4455", - "events" : [ "0x777", "0x778" ] + "events" : [ "0x777", "0x778" ] }, { "eventgroup" : "0x4465", "events" : [ "0x778", "0x779" ], - "is_multicast" : "true" + "is_multicast" : "true" }, { "eventgroup" : "0x4555", diff --git a/config/vsomeip-udp-service.json b/config/vsomeip-udp-service.json index 406f2f3..7f19ad3 100644 --- a/config/vsomeip-udp-service.json +++ b/config/vsomeip-udp-service.json @@ -1,8 +1,8 @@ { - "unicast" : "192.168.56.101", + "unicast" : "192.168.56.104", "logging" : { - "level" : "trace", + "level" : "debug", "console" : "true", "file" : { "enable" : "false", "path" : "/tmp/vsomeip.log" }, "dlt" : "false" @@ -63,12 +63,12 @@ [ { "eventgroup" : "0x4455", - "events" : [ "0x777", "0x778" ] + "events" : [ "0x777", "0x778" ] }, { "eventgroup" : "0x4465", - "events" : [ "0x778", "0x779" ], - "is_multicast" : "true" + "events" : [ "0x778", "0x779" ], + "is_multicast" : "true" }, { "eventgroup" : "0x4555", diff --git a/examples/notify-sample.cpp b/examples/notify-sample.cpp new file mode 100644 index 0000000..7661e89 --- /dev/null +++ b/examples/notify-sample.cpp @@ -0,0 +1,168 @@ +// Copyright (C) 2014 BMW Group +// Author: Lutz Bichler (lutz.bichler@bmw.de) +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +#include <chrono> +#include <condition_variable> +#include <iomanip> +#include <iostream> +#include <sstream> +#include <thread> + +#include <vsomeip/vsomeip.hpp> + +#include "sample-ids.hpp" + +class service_sample { + public: + service_sample(bool _use_tcp, uint32_t _cycle) + : app_(vsomeip::runtime::get()->create_application()), + is_registered_(false), + use_tcp_(_use_tcp), + cycle_(_cycle), + offer_thread_(std::bind(&service_sample::run, this)), + notify_thread_(std::bind(&service_sample::notify, this)), + is_offered_(false) { + } + + void init() { + std::lock_guard < std::mutex > its_lock(mutex_); + + app_->init(); + app_->register_event_handler( + std::bind(&service_sample::on_event, this, std::placeholders::_1)); + + blocked_ = true; + condition_.notify_one(); + } + + void start() { + app_->start(); + } + + void offer() { + std::lock_guard < std::mutex > its_lock(notify_mutex_); + app_->offer_service(SAMPLE_SERVICE_ID, SAMPLE_INSTANCE_ID); + is_offered_ = true; + notify_condition_.notify_one(); + } + + void stop_offer() { + app_->stop_offer_service(SAMPLE_SERVICE_ID, SAMPLE_INSTANCE_ID); + is_offered_ = false; + } + + void on_event(vsomeip::event_type_e _event) { + VSOMEIP_INFO << "Application " << app_->get_name() << " is " + << (_event == vsomeip::event_type_e::REGISTERED ? + "registered." : "deregistered."); + + if (_event == vsomeip::event_type_e::REGISTERED) { + if (!is_registered_) { + is_registered_ = true; + } + } else { + is_registered_ = false; + } + } + + void run() { + std::unique_lock < std::mutex > its_lock(mutex_); + while (!blocked_) + condition_.wait(its_lock); + + bool is_offer(true); + while (true) { + if (is_offer) + offer(); + else + stop_offer(); + std::this_thread::sleep_for(std::chrono::milliseconds(10000)); + is_offer = !is_offer; + } + } + + void notify() { + std::shared_ptr<vsomeip::payload> its_payload + = vsomeip::runtime::get()->create_payload(); + + vsomeip::byte_t its_data[256]; + uint32_t its_size = 1; + + while (true) { + VSOMEIP_INFO << "notify: started..."; + std::unique_lock < std::mutex > its_lock(notify_mutex_); + while (!is_offered_) + notify_condition_.wait(its_lock); + + VSOMEIP_INFO << "notify: unblocked ..."; + while (is_offered_) { + if (its_size == sizeof(its_data)) + its_size = 1; + + for (uint32_t i = 0; i < its_size; ++i) + its_data[i] = (i % 256); + + its_payload->set_data(its_data, its_size); + + VSOMEIP_INFO << "Setting event (Length=" << std::dec << its_size << ")."; + app_->set(SAMPLE_SERVICE_ID, SAMPLE_INSTANCE_ID, SAMPLE_EVENT_ID, its_payload); + + its_size++; + + std::this_thread::sleep_for(std::chrono::milliseconds(cycle_)); + } + } + } + +private: + std::shared_ptr<vsomeip::application> app_; + bool is_registered_; + bool use_tcp_; + uint32_t cycle_; + + std::thread offer_thread_; + std::mutex mutex_; + std::condition_variable condition_; + bool blocked_; + + std::thread notify_thread_; + std::mutex notify_mutex_; + std::condition_variable notify_condition_; + bool is_offered_; +}; + +int main(int argc, char **argv) { + bool use_tcp = false; + uint32_t cycle = 1000; // default 1s + + std::string tcp_enable("--tcp"); + std::string udp_enable("--udp"); + std::string cycle_arg("--cycle"); + + for (int i = 1; i < argc; i++) { + if (tcp_enable == argv[i]) { + use_tcp = true; + break; + } + if (udp_enable == argv[i]) { + use_tcp = false; + break; + } + + if (cycle_arg == argv[i] && i+1 < argc) { + i++; + std::stringstream converter; + converter << argv[i]; + converter >> cycle; + } + } + + service_sample its_sample(use_tcp, cycle); + its_sample.init(); + its_sample.start(); + + return 0; +} diff --git a/examples/request-sample.cpp b/examples/request-sample.cpp index c500d77..95b5e8d 100644 --- a/examples/request-sample.cpp +++ b/examples/request-sample.cpp @@ -55,9 +55,6 @@ public: this, std::placeholders::_1)); - app_->subscribe(SAMPLE_SERVICE_ID, SAMPLE_INSTANCE_ID, SAMPLE_EVENTGROUP_ID, - vsomeip::ANY_MAJOR, vsomeip::ANY_TTL); - request_->set_service(SAMPLE_SERVICE_ID); request_->set_instance(SAMPLE_INSTANCE_ID); request_->set_method(SAMPLE_METHOD_ID); @@ -75,7 +72,6 @@ public: void on_event(vsomeip::event_type_e _event) { if (_event == vsomeip::event_type_e::REGISTERED) { - //app_->offer_service(OTHER_SAMPLE_SERVICE_ID, OTHER_SAMPLE_INSTANCE_ID); app_->request_service(SAMPLE_SERVICE_ID, SAMPLE_INSTANCE_ID); } } diff --git a/examples/response-sample.cpp b/examples/response-sample.cpp index 98b8c66..43f4abe 100644 --- a/examples/response-sample.cpp +++ b/examples/response-sample.cpp @@ -21,8 +21,7 @@ class service_sample { : app_(vsomeip::runtime::get()->create_application()), is_registered_(false), use_tcp_(_use_tcp), - offer_thread_(std::bind(&service_sample::run, this)), - notification_thread_(std::bind(&service_sample::notify, this)) { + offer_thread_(std::bind(&service_sample::run, this)) { } void init() { @@ -45,36 +44,11 @@ class service_sample { } void offer() { - VSOMEIP_INFO << "Offering service."; - //std::lock_guard < std::mutex > its_lock(notification_mutex_); app_->offer_service(SAMPLE_SERVICE_ID, SAMPLE_INSTANCE_ID); - - notification_blocked_ = true; - //condition_.notify_one(); } void stop_offer() { - VSOMEIP_INFO << "Stop offering service."; app_->stop_offer_service(SAMPLE_SERVICE_ID, SAMPLE_INSTANCE_ID); - notification_blocked_ = false; - } - - void update() { - static std::shared_ptr<vsomeip::payload> its_payload = - vsomeip::runtime::get()->create_payload(); - - static vsomeip::byte_t its_data[1000]; - static uint32_t its_size = 0; - - its_size++; - for (uint32_t i = 0; i < its_size; ++i) - its_data[i] = (i % 256); - - its_payload->set_data(its_data, its_size); - - VSOMEIP_INFO << "Updating event to " << its_size << " bytes."; - app_->set(SAMPLE_SERVICE_ID, SAMPLE_INSTANCE_ID, SAMPLE_EVENT_ID, - its_payload); } void on_event(vsomeip::event_type_e _event) { @@ -127,19 +101,7 @@ class service_sample { } } - void notify() { - while (true) { - //std::unique_lock < std::mutex > its_lock(notification_mutex_); - //while (!notification_blocked_) - // notification_condition_.wait(its_lock); - //while (notification_blocked_) { - std::this_thread::sleep_for(std::chrono::milliseconds(500)); - if (notification_blocked_) update(); - //} - } - } - - private: +private: std::shared_ptr<vsomeip::application> app_; bool is_registered_; bool use_tcp_; @@ -148,11 +110,6 @@ class service_sample { std::mutex mutex_; std::condition_variable condition_; bool blocked_; - - std::thread notification_thread_; - std::mutex notification_mutex_; - std::condition_variable notification_condition_; - bool notification_blocked_; }; int main(int argc, char **argv) { diff --git a/examples/subscribe-sample.cpp b/examples/subscribe-sample.cpp new file mode 100644 index 0000000..bb1beaf --- /dev/null +++ b/examples/subscribe-sample.cpp @@ -0,0 +1,92 @@ +// Copyright (C) 2014 BMW Group +// Author: Lutz Bichler (lutz.bichler@bmw.de) +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +#include <chrono> +#include <condition_variable> +#include <iomanip> +#include <iostream> +#include <sstream> +#include <thread> + +#include <vsomeip/vsomeip.hpp> + +#include "sample-ids.hpp" + +class client_sample { + public: + client_sample(bool _use_tcp) + : app_(vsomeip::runtime::get()->create_application()), + use_tcp_(_use_tcp) { + } + + void init() { + app_->init(); + + VSOMEIP_INFO<< "Client settings [protocol=" + << (use_tcp_ ? "TCP" : "UDP") + << "]"; + + app_->register_message_handler( + vsomeip::ANY_SERVICE, SAMPLE_INSTANCE_ID, vsomeip::ANY_METHOD, + std::bind(&client_sample::on_message, this, std::placeholders::_1)); + + app_->subscribe(SAMPLE_SERVICE_ID, SAMPLE_INSTANCE_ID, + SAMPLE_EVENTGROUP_ID); + } + + void start() { + app_->start(); + } + + void on_message(std::shared_ptr<vsomeip::message> &_response) { + std::stringstream its_message; + its_message << "Received a notification from Service [" << std::setw(4) + << std::setfill('0') << std::hex << _response->get_service() + << "." << std::setw(4) << std::setfill('0') << std::hex + << _response->get_instance() << "] to Client/Session [" + << std::setw(4) << std::setfill('0') << std::hex + << _response->get_client() << "/" << std::setw(4) + << std::setfill('0') << std::hex << _response->get_session() + << "] = "; + std::shared_ptr<vsomeip::payload> its_payload = _response->get_payload(); + its_message << "(" << std::dec << its_payload->get_length() << ")"; +#if 0 + for (uint32_t i = 0; i < its_payload->get_length(); ++i) + its_message << std::hex << std::setw(2) << std::setfill('0') + << (int) its_payload->get_data()[i] << " "; +#endif + VSOMEIP_INFO << its_message.str(); + } + +private: + std::shared_ptr< vsomeip::application > app_; + bool use_tcp_; + bool be_quiet_; +}; + +int main(int argc, char **argv) { + bool use_tcp = false; + bool be_quiet = false; + uint32_t cycle = 1000; // Default: 1s + + std::string tcp_enable("--tcp"); + std::string udp_enable("--udp"); + + int i = 1; + while (i < argc) { + if (tcp_enable == argv[i]) { + use_tcp = true; + } else if (udp_enable == argv[i]) { + use_tcp = false; + } + i++; + } + + client_sample its_sample(use_tcp); + its_sample.init(); + its_sample.start(); + return 0; +} diff --git a/implementation/configuration/include/internal.hpp b/implementation/configuration/include/internal.hpp index e4b75cd..910cb3d 100644 --- a/implementation/configuration/include/internal.hpp +++ b/implementation/configuration/include/internal.hpp @@ -34,28 +34,26 @@ namespace vsomeip { #define VSOMEIP_COMMAND_SIZE_POS_MAX 5 #define VSOMEIP_COMMAND_PAYLOAD_POS 7 -#define VSOMEIP_REGISTER_APPLICATION 0x00 -#define VSOMEIP_DEREGISTER_APPLICATION 0x01 -#define VSOMEIP_APPLICATION_LOST 0x02 -#define VSOMEIP_ROUTING_INFO 0x03 - -#define VSOMEIP_PING 0x0E -#define VSOMEIP_PONG 0x0F - -#define VSOMEIP_OFFER_SERVICE 0x10 -#define VSOMEIP_STOP_OFFER_SERVICE 0x11 -#define VSOMEIP_PUBLISH_EVENTGROUP 0x12 -#define VSOMEIP_STOP_PUBLISH_EVENTGROUP 0x13 -#define VSOMEIP_ADD_EVENT 0x14 -#define VSOMEIP_ADD_FIELD 0x15 -#define VSOMEIP_REMOVE_EVENT_OR_FIELD 0x16 -#define VSOMEIP_SEND 0x17 -#define VSOMEIP_SET 0x18 - -#define VSOMEIP_OFFER_SERVICE_COMMAND_SIZE 20 -#define VSOMEIP_STOP_OFFER_SERVICE_COMMAND_SIZE 11 -#define VSOMEIP_PUBLISH_EVENTGROUP_COMMAND_SIZE 18 -#define VSOMEIP_STOP_PUBLISH_EVENTGROUP_COMMAND_SIZE 13 +#define VSOMEIP_REGISTER_APPLICATION 0x00 +#define VSOMEIP_DEREGISTER_APPLICATION 0x01 +#define VSOMEIP_APPLICATION_LOST 0x02 +#define VSOMEIP_ROUTING_INFO 0x03 + +#define VSOMEIP_PING 0x0E +#define VSOMEIP_PONG 0x0F + +#define VSOMEIP_OFFER_SERVICE 0x10 +#define VSOMEIP_STOP_OFFER_SERVICE 0x11 +#define VSOMEIP_SUBSCRIBE 0x12 +#define VSOMEIP_UNSUBSCRIBE 0x13 + +#define VSOMEIP_SEND 0x17 +#define VSOMEIP_SET 0x18 + +#define VSOMEIP_OFFER_SERVICE_COMMAND_SIZE 20 +#define VSOMEIP_STOP_OFFER_SERVICE_COMMAND_SIZE 11 +#define VSOMEIP_SUBSCRIBE_COMMAND_SIZE 18 +#define VSOMEIP_UNSUBSCRIBE_COMMAND_SIZE 13 } // namespace vsomeip diff --git a/implementation/endpoints/include/client_endpoint_impl.hpp b/implementation/endpoints/include/client_endpoint_impl.hpp index f88ea93..7e460b6 100644 --- a/implementation/endpoints/include/client_endpoint_impl.hpp +++ b/implementation/endpoints/include/client_endpoint_impl.hpp @@ -38,7 +38,7 @@ public: virtual ~client_endpoint_impl();
bool send(const uint8_t *_data, uint32_t _size, bool _flush);
- bool send_to(const boost::asio::ip::address &_address, uint16_t _port,
+ bool send_to(const std::shared_ptr<endpoint_definition> _target,
const byte_t *_data, uint32_t _size, bool _flush = true);
bool flush();
diff --git a/implementation/endpoints/include/endpoint.hpp b/implementation/endpoints/include/endpoint.hpp index f1e58aa..3a636be 100644 --- a/implementation/endpoints/include/endpoint.hpp +++ b/implementation/endpoints/include/endpoint.hpp @@ -13,6 +13,8 @@ namespace vsomeip {
+class endpoint_definition;
+
class endpoint
{
public:
@@ -24,7 +26,7 @@ public: virtual bool is_connected() const = 0;
virtual bool send(const byte_t *_data, uint32_t _size, bool _flush = true) = 0;
- virtual bool send_to(const boost::asio::ip::address &_address, uint16_t _port,
+ virtual bool send_to(std::shared_ptr<endpoint_definition> _target,
const byte_t *_data, uint32_t _size, bool _flush = true) = 0;
virtual void enable_magic_cookies() = 0;
virtual void receive() = 0;
diff --git a/implementation/endpoints/include/local_server_endpoint_impl.hpp b/implementation/endpoints/include/local_server_endpoint_impl.hpp index 11e26be..05c1122 100644 --- a/implementation/endpoints/include/local_server_endpoint_impl.hpp +++ b/implementation/endpoints/include/local_server_endpoint_impl.hpp @@ -38,8 +38,8 @@ public: void restart();
void receive();
- bool send_to(const boost::asio::ip::address &_address, uint16_t _port,
- const byte_t *_data, uint32_t _size, bool _flush);
+ bool send_to(const std::shared_ptr<endpoint_definition>,
+ const byte_t *_data, uint32_t _size, bool _flush);
void send_queued(endpoint_type _target, message_buffer_ptr_t _data);
endpoint_type get_remote() const;
diff --git a/implementation/endpoints/include/tcp_server_endpoint_impl.hpp b/implementation/endpoints/include/tcp_server_endpoint_impl.hpp index 623634c..0aea5d8 100644 --- a/implementation/endpoints/include/tcp_server_endpoint_impl.hpp +++ b/implementation/endpoints/include/tcp_server_endpoint_impl.hpp @@ -32,7 +32,7 @@ public: void start();
void stop();
- bool send_to(const boost::asio::ip::address &_address, uint16_t _port,
+ bool send_to(const std::shared_ptr<endpoint_definition> _target,
const byte_t *_data, uint32_t _size, bool _flush);
void send_queued(endpoint_type _target, message_buffer_ptr_t _buffer);
diff --git a/implementation/endpoints/include/udp_server_endpoint_impl.hpp b/implementation/endpoints/include/udp_server_endpoint_impl.hpp index da1a98a..8e843b9 100644 --- a/implementation/endpoints/include/udp_server_endpoint_impl.hpp +++ b/implementation/endpoints/include/udp_server_endpoint_impl.hpp @@ -38,8 +38,8 @@ public: void restart();
void receive();
- bool send_to(const boost::asio::ip::address &_address, uint16_t _port,
- const byte_t *_data, uint32_t _size, bool _flush);
+ bool send_to(const std::shared_ptr<endpoint_definition> _target,
+ const byte_t *_data, uint32_t _size, bool _flush);
void send_queued(endpoint_type _target, message_buffer_ptr_t _buffer);
endpoint_type get_remote() const;
diff --git a/implementation/endpoints/src/client_endpoint_impl.cpp b/implementation/endpoints/src/client_endpoint_impl.cpp index ce16386..f71b5a3 100644 --- a/implementation/endpoints/src/client_endpoint_impl.cpp +++ b/implementation/endpoints/src/client_endpoint_impl.cpp @@ -65,7 +65,7 @@ void client_endpoint_impl<Protocol, MaxBufferSize>::restart() { template<typename Protocol, int MaxBufferSize>
bool client_endpoint_impl<Protocol, MaxBufferSize>::send_to(
- const boost::asio::ip::address &_address, uint16_t _port,
+ const std::shared_ptr<endpoint_definition> _target,
const byte_t *_data, uint32_t _size, bool _flush) {
VSOMEIP_ERROR
<< "Clients endpoints must not be used to send to explicitely specified targets";
@@ -79,7 +79,7 @@ bool client_endpoint_impl<Protocol, MaxBufferSize>::send(const uint8_t *_data, std::unique_lock < std::mutex > its_lock(mutex_);
#if 0
std::stringstream msg;
- msg << "cei<" << this << ">::send: ";
+ msg << "cei::send: ";
for (uint32_t i = 0; i < _size; i++)
msg << std::hex << std::setw(2) << std::setfill('0') << (int)_data[i] << " ";
VSOMEIP_DEBUG << msg.str();
@@ -103,7 +103,7 @@ bool client_endpoint_impl<Protocol, MaxBufferSize>::send(const uint8_t *_data, this->shared_from_this(), std::placeholders::_1));
}
- return true;
+ return (true);
}
template<typename Protocol, int MaxBufferSize>
diff --git a/implementation/endpoints/src/local_server_endpoint_impl.cpp b/implementation/endpoints/src/local_server_endpoint_impl.cpp index 5f2357f..332db0b 100644 --- a/implementation/endpoints/src/local_server_endpoint_impl.cpp +++ b/implementation/endpoints/src/local_server_endpoint_impl.cpp @@ -46,7 +46,7 @@ void local_server_endpoint_impl::stop() { }
-bool local_server_endpoint_impl::send_to(const boost::asio::ip::address &_address, uint16_t _port,
+bool local_server_endpoint_impl::send_to(const std::shared_ptr<endpoint_definition> _target,
const byte_t *_data, uint32_t _size, bool _flush) {
return false;
}
diff --git a/implementation/endpoints/src/tcp_server_endpoint_impl.cpp b/implementation/endpoints/src/tcp_server_endpoint_impl.cpp index 616f273..b3b6620 100644 --- a/implementation/endpoints/src/tcp_server_endpoint_impl.cpp +++ b/implementation/endpoints/src/tcp_server_endpoint_impl.cpp @@ -11,6 +11,7 @@ #include <vsomeip/constants.hpp>
#include <vsomeip/logger.hpp>
+#include "../include/endpoint_definition.hpp"
#include "../include/endpoint_host.hpp"
#include "../include/tcp_server_endpoint_impl.hpp"
#include "../../utility/include/utility.hpp"
@@ -50,9 +51,9 @@ void tcp_server_endpoint_impl::stop() { }
bool tcp_server_endpoint_impl::send_to(
- const boost::asio::ip::address &_address, uint16_t _port,
+ const std::shared_ptr<endpoint_definition> _target,
const byte_t *_data, uint32_t _size, bool _flush) {
- endpoint_type its_target(_address, _port);
+ endpoint_type its_target(_target->get_address(), _target->get_port());
return send_intern(its_target, _data, _size, _flush);
}
diff --git a/implementation/endpoints/src/udp_client_endpoint_impl.cpp b/implementation/endpoints/src/udp_client_endpoint_impl.cpp index 585b056..e6da818 100644 --- a/implementation/endpoints/src/udp_client_endpoint_impl.cpp +++ b/implementation/endpoints/src/udp_client_endpoint_impl.cpp @@ -45,7 +45,7 @@ void udp_client_endpoint_impl::start() { void udp_client_endpoint_impl::send_queued(message_buffer_ptr_t _buffer) {
#if 0
std::stringstream msg;
- msg << "ucei<" << this << ">::sq: ";
+ msg << "ucei<" << remote_.address() << ":" << std::dec << remote_.port() << ">::sq: ";
for (std::size_t i = 0; i < _buffer->size(); i++)
msg << std::hex << std::setw(2) << std::setfill('0') << (int)(*_buffer)[i] << " ";
VSOMEIP_DEBUG << msg.str();
diff --git a/implementation/endpoints/src/udp_server_endpoint_impl.cpp b/implementation/endpoints/src/udp_server_endpoint_impl.cpp index b63c3a5..84cddcf 100644 --- a/implementation/endpoints/src/udp_server_endpoint_impl.cpp +++ b/implementation/endpoints/src/udp_server_endpoint_impl.cpp @@ -7,11 +7,11 @@ #include <iomanip>
#include <sstream>
-#include <boost/asio/ip/address.hpp>
#include <boost/asio/ip/multicast.hpp>
#include <vsomeip/logger.hpp>
+#include "../include/endpoint_definition.hpp"
#include "../include/endpoint_host.hpp"
#include "../include/udp_server_endpoint_impl.hpp"
#include "../../utility/include/byteorder.hpp"
@@ -63,9 +63,9 @@ void udp_server_endpoint_impl::restart() { }
bool udp_server_endpoint_impl::send_to(
- const boost::asio::ip::address &_address, uint16_t _port,
+ const std::shared_ptr<endpoint_definition> _target,
const byte_t *_data, uint32_t _size, bool _flush) {
- endpoint_type its_target(_address, _port);
+ endpoint_type its_target(_target->get_address(), _target->get_port());
return send_intern(its_target, _data, _size, _flush);
}
diff --git a/implementation/routing/include/event.hpp b/implementation/routing/include/event.hpp index 2299279..52ad7da 100644 --- a/implementation/routing/include/event.hpp +++ b/implementation/routing/include/event.hpp @@ -10,11 +10,14 @@ #include <map> #include <memory> #include <mutex> +#include <set> #include <boost/asio/io_service.hpp> #include <boost/asio/ip/address.hpp> #include <boost/asio/system_timer.hpp> +#include <vsomeip/primitive_types.hpp> + namespace vsomeip { class endpoint; @@ -49,6 +52,11 @@ class event : public std::enable_shared_from_this<event> { // SIP_RPC_359 (epsilon change) is not supported! + + const std::set<eventgroup_t> & get_eventgroups() const; + void add_eventgroup(eventgroup_t _eventgroup); + void set_eventgroups(const std::set<eventgroup_t> &_eventgroups); + private: void update_cbk(boost::system::error_code const &_error); void notify(); @@ -64,6 +72,8 @@ class event : public std::enable_shared_from_this<event> { std::chrono::milliseconds cycle_; bool is_updating_on_change_; + + std::set<eventgroup_t> eventgroups_; }; } // namespace vsomeip diff --git a/implementation/routing/include/eventgroupinfo.hpp b/implementation/routing/include/eventgroupinfo.hpp index 5305e90..6c75ca3 100644 --- a/implementation/routing/include/eventgroupinfo.hpp +++ b/implementation/routing/include/eventgroupinfo.hpp @@ -16,11 +16,13 @@ namespace vsomeip { +class endpoint_definition; class event; class eventgroupinfo { public: eventgroupinfo(); + eventgroupinfo(major_version_t _major, ttl_t _ttl); ~eventgroupinfo(); major_version_t get_major() const; @@ -29,12 +31,18 @@ public: ttl_t get_ttl() const; void set_ttl(ttl_t _ttl); - bool get_multicast(boost::asio::ip::address &_address, uint16_t &_port); + bool is_multicast() const; + bool get_multicast(boost::asio::ip::address &_address, uint16_t &_port) const; void set_multicast(const boost::asio::ip::address &_address, uint16_t _port); const std::set<std::shared_ptr<event> > get_events() const; void add_event(std::shared_ptr<event> _event); + const std::set<std::shared_ptr<endpoint_definition> > get_targets() const; + void add_target(std::shared_ptr<endpoint_definition> _target); + void del_target(std::shared_ptr<endpoint_definition> _target); + void clear_targets(); + private: major_version_t major_; ttl_t ttl_; @@ -44,6 +52,7 @@ private: uint16_t port_; std::set<std::shared_ptr<event> > events_; + std::set<std::shared_ptr<endpoint_definition> > targets_; }; } // namespace vsomeip diff --git a/implementation/routing/include/routing_manager.hpp b/implementation/routing/include/routing_manager.hpp index 062576f..f169ac3 100644 --- a/implementation/routing/include/routing_manager.hpp +++ b/implementation/routing/include/routing_manager.hpp @@ -41,13 +41,6 @@ public: virtual void stop_offer_service(client_t _client, service_t _service, instance_t _instance) = 0; - virtual void publish_eventgroup(client_t _client, service_t _service, - instance_t _instance, eventgroup_t _eventgroup, - major_version_t _major, ttl_t _ttl) = 0; - - virtual void stop_publish_eventgroup(client_t _client, service_t _service, - instance_t _instance, eventgroup_t _eventgroup) = 0; - virtual void request_service(client_t _client, service_t _service, instance_t _instance, major_version_t _major, minor_version_t _minor, ttl_t _ttl) = 0; diff --git a/implementation/routing/include/routing_manager_impl.hpp b/implementation/routing/include/routing_manager_impl.hpp index ac41eec..3ba3b2f 100644 --- a/implementation/routing/include/routing_manager_impl.hpp +++ b/implementation/routing/include/routing_manager_impl.hpp @@ -15,6 +15,8 @@ #include <boost/asio/ip/address.hpp> #include <boost/asio/io_service.hpp> +#include <vsomeip/primitive_types.hpp> + #include "routing_manager.hpp" #include "../../endpoints/include/endpoint_host.hpp" #include "../../service_discovery/include/service_discovery_host.hpp" @@ -59,13 +61,6 @@ class routing_manager_impl : public routing_manager, public endpoint_host, void stop_offer_service(client_t _client, service_t _service, instance_t _instance); - void publish_eventgroup(client_t _client, service_t _service, - instance_t _instance, eventgroup_t _eventgroup, - major_version_t _major, ttl_t _ttl); - - void stop_publish_eventgroup(client_t _client, service_t _service, - instance_t _instance, eventgroup_t _eventgroup); - void request_service(client_t _client, service_t _service, instance_t _instance, major_version_t _major, minor_version_t _minor, ttl_t _ttl); @@ -114,8 +109,8 @@ class routing_manager_impl : public routing_manager, public endpoint_host, services_t get_offered_services(const std::string &_name) const; void create_service_discovery_endpoint(const std::string &_address, uint16_t _port, bool _reliable); - bool send_subscribe(const boost::asio::ip::address &_address, uint16_t _port, - bool _reliable, const byte_t *_data, uint32_t _size); + bool send_to(const boost::asio::ip::address &_address, uint16_t _port, + bool _reliable, const byte_t *_data, uint32_t _size); void init_routing_info(); void add_routing_info(service_t _service, instance_t _instance, major_version_t _major, minor_version_t _minor, @@ -125,20 +120,19 @@ class routing_manager_impl : public routing_manager, public endpoint_host, bool _reliable); void init_event_routing_info(); - void add_subscription(service_t _service, instance_t _instance, - eventgroup_t _eventgroup, - const boost::asio::ip::address &_address, - uint16_t _reliable_port, uint16_t _unreliable_port); - void del_subscription(service_t _service, instance_t _instance, - eventgroup_t _eventgroup, - const boost::asio::ip::address &_address, - uint16_t _reliable_port, uint16_t _unreliable_port); + void on_subscribe(service_t _service, instance_t _instance, eventgroup_t _eventgroup, + std::shared_ptr<endpoint_definition> _target); + void on_unsubscribe(service_t _service, instance_t _instance, eventgroup_t _eventgroup, + std::shared_ptr<endpoint_definition> _target); + void on_subscribe_ack(service_t _service, instance_t _instance, + const boost::asio::ip::address &_address, uint16_t _port); private: bool deliver_message(const byte_t *_data, length_t _length, instance_t _instance); client_t find_local_client(service_t _service, instance_t _instance); + std::set<client_t> find_local_clients(service_t _service, instance_t _instance, eventgroup_t _eventgroup); instance_t find_instance(service_t _service, endpoint *_endpoint); std::shared_ptr<serviceinfo> find_service(service_t _service, @@ -203,12 +197,13 @@ class routing_manager_impl : public routing_manager, public endpoint_host, // Eventgroups std::map<service_t, - std::map<instance_t, - std::map<eventgroup_t, std::shared_ptr<eventgroupinfo> > > > eventgroups_; + std::map<instance_t, + std::map<eventgroup_t, std::shared_ptr<eventgroupinfo> > > > eventgroups_; std::map<service_t, - std::map<instance_t, std::map<event_t, std::shared_ptr<event> > > > events_; - - std::map<eventgroup_t, std::set<client_t> > eventgroup_clients_; + std::map<instance_t, std::map<event_t, std::shared_ptr<event> > > > events_; + std::map<service_t, + std::map<instance_t, + std::map<eventgroup_t, std::set<client_t> > > > eventgroup_clients_; // Mutexes std::recursive_mutex endpoint_mutex_; diff --git a/implementation/routing/include/routing_manager_proxy.hpp b/implementation/routing/include/routing_manager_proxy.hpp index 4099349..f871ca2 100644 --- a/implementation/routing/include/routing_manager_proxy.hpp +++ b/implementation/routing/include/routing_manager_proxy.hpp @@ -40,13 +40,6 @@ public: void stop_offer_service(client_t _client, service_t _service, instance_t _instance); - void publish_eventgroup(client_t _client, service_t _service, - instance_t _instance, eventgroup_t _eventgroup, - major_version_t _major, ttl_t _ttl); - - void stop_publish_eventgroup(client_t _client, service_t _service, - instance_t _instance, eventgroup_t _eventgroup); - void request_service(client_t _client, service_t _service, instance_t _instance, major_version_t _major, minor_version_t _minor, ttl_t _ttl); diff --git a/implementation/routing/src/event.cpp b/implementation/routing/src/event.cpp index ebfc5fd..3a96227 100644 --- a/implementation/routing/src/event.cpp +++ b/implementation/routing/src/event.cpp @@ -87,6 +87,18 @@ void event::set_update_cycle(std::chrono::milliseconds &_cycle) { } } +const std::set<eventgroup_t> & event::get_eventgroups() const { + return eventgroups_; +} + +void event::add_eventgroup(eventgroup_t _eventgroup) { + eventgroups_.insert(_eventgroup); +} + +void event::set_eventgroups(const std::set<eventgroup_t> &_eventgroups) { + eventgroups_ = _eventgroups; +} + void event::update_cbk(boost::system::error_code const &_error) { if (!_error) { cycle_timer_.expires_from_now(cycle_); @@ -99,7 +111,7 @@ void event::update_cbk(boost::system::error_code const &_error) { } void event::notify() { - routing_->send(VSOMEIP_ROUTING_CLIENT, message_, true, true); + routing_->send(VSOMEIP_ROUTING_CLIENT, message_, true, false); } } // namespace vsomeip diff --git a/implementation/routing/src/eventgroupinfo.cpp b/implementation/routing/src/eventgroupinfo.cpp index bb9de5a..1838d8e 100644 --- a/implementation/routing/src/eventgroupinfo.cpp +++ b/implementation/routing/src/eventgroupinfo.cpp @@ -16,6 +16,12 @@ eventgroupinfo::eventgroupinfo() is_multicast_(false) { } +eventgroupinfo::eventgroupinfo(major_version_t _major, ttl_t _ttl) + : major_(_major), + ttl_(_ttl), + is_multicast_(false) { +} + eventgroupinfo::~eventgroupinfo() { } @@ -35,7 +41,11 @@ void eventgroupinfo::set_ttl(ttl_t _ttl) { ttl_ = _ttl; } -bool eventgroupinfo::get_multicast(boost::asio::ip::address &_address, uint16_t &_port) { +bool eventgroupinfo::is_multicast() const { + return is_multicast_; +} + +bool eventgroupinfo::get_multicast(boost::asio::ip::address &_address, uint16_t &_port) const { if (is_multicast_) { _address = address_; _port = port_; @@ -57,4 +67,20 @@ void eventgroupinfo::add_event(std::shared_ptr<event> _event) { events_.insert(_event); } +const std::set<std::shared_ptr<endpoint_definition> > eventgroupinfo::get_targets() const { + return targets_; +} + +void eventgroupinfo::add_target(std::shared_ptr<endpoint_definition> _target) { + targets_.insert(_target); +} + +void eventgroupinfo::del_target(std::shared_ptr<endpoint_definition> _target) { + targets_.erase(_target); +} + +void eventgroupinfo::clear_targets() { + targets_.clear(); +} + } // namespace vsomeip diff --git a/implementation/routing/src/routing_manager_impl.cpp b/implementation/routing/src/routing_manager_impl.cpp index f9263a7..80d8fb9 100644 --- a/implementation/routing/src/routing_manager_impl.cpp +++ b/implementation/routing/src/routing_manager_impl.cpp @@ -21,6 +21,7 @@ #include "../include/servicegroup.hpp" #include "../include/serviceinfo.hpp" #include "../../configuration/include/internal.hpp" +#include "../../endpoints/include/endpoint_definition.hpp" #include "../../endpoints/include/local_client_endpoint_impl.hpp" #include "../../endpoints/include/tcp_client_endpoint_impl.hpp" #include "../../endpoints/include/tcp_server_endpoint_impl.hpp" @@ -48,7 +49,7 @@ routing_manager_impl::~routing_manager_impl() { } boost::asio::io_service & routing_manager_impl::get_io() { - return io_; + return (io_); } void routing_manager_impl::init() { @@ -65,9 +66,9 @@ void routing_manager_impl::init() { stub_->init(); if (configuration_->is_service_discovery_enabled()) { - VSOMEIP_INFO << "Service Discovery enabled."; + VSOMEIP_INFO<< "Service Discovery enabled."; sd::runtime **its_runtime = - static_cast<sd::runtime **>(utility::load_library( + static_cast<sd::runtime **>(utility::load_library( VSOMEIP_SD_LIBRARY, VSOMEIP_SD_RUNTIME_SYMBOL_STRING)); @@ -114,8 +115,6 @@ void routing_manager_impl::offer_service(client_t _client, service_t _service, host_->on_error(error_code_e::SERVICE_PROPERTY_MISMATCH); } } else { - VSOMEIP_INFO << "CREATING SERVICE [" << std::hex << _service << "." - << _instance << "]"; (void) create_service(_service, _instance, _major, _minor, _ttl); } @@ -140,21 +139,6 @@ void routing_manager_impl::stop_offer_service(client_t its_client, } } -void routing_manager_impl::publish_eventgroup(client_t its_client, - service_t _service, - instance_t _instance, - eventgroup_t _eventgroup, - major_version_t _major, - ttl_t _ttl) { -} - -void routing_manager_impl::stop_publish_eventgroup(client_t its_client, - service_t _service, - instance_t _instance, - eventgroup_t _eventgroup) { - -} - void routing_manager_impl::request_service(client_t _client, service_t _service, instance_t _instance, major_version_t _major, @@ -191,10 +175,12 @@ void routing_manager_impl::subscribe(client_t _client, service_t _service, eventgroup_t _eventgroup, major_version_t _major, ttl_t _ttl) { if (discovery_) { - eventgroup_clients_[_eventgroup].insert(_client); - discovery_->subscribe(_service, _instance, _eventgroup, _major, _ttl); + eventgroup_clients_[_service][_instance][_eventgroup].insert(_client); + if (0 == find_local_client(_service, _instance)) { + discovery_->subscribe(_service, _instance, _eventgroup, _major, _ttl); + } } else { - VSOMEIP_ERROR << "SOME/IP eventgroups require SD to be enabled!"; + VSOMEIP_ERROR<< "SOME/IP eventgroups require SD to be enabled!"; } } @@ -202,24 +188,31 @@ void routing_manager_impl::unsubscribe(client_t _client, service_t _service, instance_t _instance, eventgroup_t _eventgroup) { if (discovery_) { - auto found_eventgroup = eventgroup_clients_.find(_eventgroup); - if (found_eventgroup != eventgroup_clients_.end()) { - found_eventgroup->second.erase(_client); - if (0 == found_eventgroup->second.size()) { - eventgroup_clients_.erase(_eventgroup); + auto found_service = eventgroup_clients_.find(_service); + if (found_service != eventgroup_clients_.end()) { + auto found_instance = found_service->second.find(_instance); + if (found_instance != found_service->second.end()) { + auto found_eventgroup = found_instance->second.find(_eventgroup); + if (found_eventgroup != found_instance->second.end()) { + found_eventgroup->second.erase(_client); + if (0 == found_eventgroup->second.size()) { + eventgroup_clients_.erase(_eventgroup); + } + } } } discovery_->unsubscribe(_service, _instance, _eventgroup); } else { - VSOMEIP_ERROR << "SOME/IP eventgroups require SD to be enabled!"; + VSOMEIP_ERROR<< "SOME/IP eventgroups require SD to be enabled!"; } } bool routing_manager_impl::send(client_t its_client, std::shared_ptr<message> _message, - bool _reliable, bool _flush) { + bool _reliable, + bool _flush) { bool is_sent(false); - std::unique_lock < std::mutex > its_lock(serialize_mutex_); + std::unique_lock<std::mutex> its_lock(serialize_mutex_); if (utility::is_request(_message->get_message_type())) { _message->set_client(its_client); @@ -230,12 +223,13 @@ bool routing_manager_impl::send(client_t its_client, _message->get_instance(), _reliable, _flush); serializer_->reset(); } - return is_sent; + return (is_sent); } bool routing_manager_impl::send(client_t _client, const byte_t *_data, length_t _size, instance_t _instance, - bool _flush, bool _reliable) { + bool _flush, + bool _reliable) { bool is_sent(false); std::shared_ptr<endpoint> its_target; @@ -288,8 +282,7 @@ bool routing_manager_impl::send(client_t _client, const byte_t *_data, if (its_target) { is_sent = its_target->send(_data, _size, _flush); } else { - VSOMEIP_ERROR - << "Routing error. Client from remote service could not be found!"; + VSOMEIP_ERROR<< "Routing error. Client from remote service could not be found!"; } } else { std::shared_ptr<serviceinfo> its_info( @@ -297,10 +290,36 @@ bool routing_manager_impl::send(client_t _client, const byte_t *_data, if (its_info) { its_target = its_info->get_endpoint(_reliable); if (its_target) { - is_sent = its_target->send(_data, _size, _flush); + if (is_notification) { + method_t its_method = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_METHOD_POS_MIN], + _data[VSOMEIP_METHOD_POS_MAX]); + std::shared_ptr<event> its_event = find_event(its_service, _instance, its_method); + if (its_event) { + for (auto its_group : its_event->get_eventgroups()) { + // local + auto its_local_clients = find_local_clients(its_service, _instance, its_group); + for (auto its_local_client : its_local_clients) { + std::shared_ptr<endpoint> its_local_target = find_local(its_local_client); + if (its_target) { + its_local_target->send(_data, _size); + } + } + + // remote + auto its_eventgroup = find_eventgroup(its_service, _instance, its_group); + if (its_eventgroup) { + for (auto its_remote : its_eventgroup->get_targets()) { + its_target->send_to(its_remote, _data, _size); + } + } + } + } + } else { + is_sent = its_target->send(_data, _size, _flush); + } } else { VSOMEIP_ERROR - << "Routing error. Service endpoint could not be found!"; + << "Routing error. Service endpoint could not be found!"; } } else { VSOMEIP_ERROR << "Routing error. Service could not be found!"; @@ -310,7 +329,7 @@ bool routing_manager_impl::send(client_t _client, const byte_t *_data, } } - return is_sent; + return (is_sent); } void routing_manager_impl::set(client_t its_client, service_t _service, @@ -320,22 +339,24 @@ void routing_manager_impl::set(client_t its_client, service_t _service, if (its_event) { its_event->set_payload(_payload); } else { - VSOMEIP_ERROR << "Event [" << std::hex << std::setw(4) << std::setfill('0') - << _service << "." << std::setw(4) << std::setfill('0') << _instance - << "." << std::setw(4) << std::setfill('0') << _event - << "] is unknown!"; + VSOMEIP_ERROR<< "Event [" << std::hex << std::setw(4) << std::setfill('0') + << _service << "." << std::setw(4) << std::setfill('0') << _instance + << "." << std::setw(4) << std::setfill('0') << _event + << "] is unknown!"; } } -bool routing_manager_impl::send_subscribe( - const boost::asio::ip::address &_address, uint16_t _port, bool _reliable, - const byte_t *_data, uint32_t _size) { +bool routing_manager_impl::send_to(const boost::asio::ip::address &_address, + uint16_t _port, bool _reliable, + const byte_t *_data, uint32_t _size) { std::shared_ptr<endpoint> its_endpoint = find_server_endpoint(_port, _reliable); if (its_endpoint) - return its_endpoint->send_to(_address, _port, _data, _size); + return (its_endpoint->send_to( + std::make_shared<endpoint_definition>( + _address, _port, _reliable), _data, _size)); - return false; + return (false); } void routing_manager_impl::on_message(const byte_t *_data, length_t _size, @@ -364,16 +385,16 @@ void routing_manager_impl::on_message(const byte_t *_data, length_t _size, _data[VSOMEIP_CLIENT_POS_MAX]); } - if (its_client == host_->get_client()) { + if (its_client == host_->get_client() || utility::is_notification(_data)) { deliver_message(_data, _size, its_instance); } else if (its_client != VSOMEIP_ROUTING_CLIENT) { send(its_client, _data, _size, its_instance, true, false); } else { - VSOMEIP_ERROR << "Could not determine target application!"; + VSOMEIP_ERROR << "Cannot determine target application!"; } } else { - VSOMEIP_ERROR << "Could not determine service instance for [" - << its_service << "]"; + VSOMEIP_ERROR + << "Cannot determine service instance for [" << its_service << "]"; } } } else { @@ -426,7 +447,7 @@ bool routing_manager_impl::deliver_message(const byte_t *_data, length_t _size, // TODO: send error "Malformed Message" //send_error(); } - return is_sent; + return (is_sent); } bool routing_manager_impl::is_available(service_t _service, @@ -435,7 +456,7 @@ bool routing_manager_impl::is_available(service_t _service, if (find_local_service != local_services_.end()) { auto find_local_instance = find_local_service->second.find(_instance); if (find_local_instance != find_local_service->second.end()) { - return true; + return (true); } } @@ -443,16 +464,16 @@ bool routing_manager_impl::is_available(service_t _service, if (find_remote_service != remote_services_.end()) { auto find_remote_instance = find_remote_service->second.find(_instance); if (find_remote_instance != find_remote_service->second.end()) { - return true; + return (true); } } - return false; + return (false); } const std::map<std::string, std::shared_ptr<servicegroup> > & routing_manager_impl::get_servicegroups() const { - return servicegroups_; + return (servicegroups_); } std::shared_ptr<eventgroupinfo> routing_manager_impl::find_eventgroup( @@ -482,11 +503,11 @@ std::shared_ptr<eventgroupinfo> routing_manager_impl::find_eventgroup( } } } - return its_info; + return (its_info); } std::shared_ptr<configuration> routing_manager_impl::get_configuration() const { - return host_->get_configuration(); + return (host_->get_configuration()); } void routing_manager_impl::create_service_discovery_endpoint( @@ -515,10 +536,10 @@ services_t routing_manager_impl::get_offered_services( services_t its_offers; auto find_servicegroup = servicegroups_.find(_name); if (find_servicegroup != servicegroups_.end()) { - return find_servicegroup->second->get_services(); + return (find_servicegroup->second->get_services()); } - return its_offers; + return (its_offers); } /////////////////////////////////////////////////////////////////////////////// @@ -534,7 +555,7 @@ std::shared_ptr<serviceinfo> routing_manager_impl::find_service( its_info = found_instance->second; } } - return its_info; + return (its_info); } std::shared_ptr<serviceinfo> routing_manager_impl::create_service( @@ -556,16 +577,12 @@ std::shared_ptr<serviceinfo> routing_manager_impl::create_service( its_info->set_multicast_group( configuration_->get_multicast_group(_service, _instance)); - VSOMEIP_DEBUG << "MULTICAST [" - << std::hex << _service << "." << _instance << "] --> " - << its_info->get_multicast_group(); - std::shared_ptr<endpoint> its_reliable_endpoint; std::shared_ptr<endpoint> its_unreliable_endpoint; if (ILLEGAL_PORT != its_reliable_port) { its_reliable_endpoint = find_or_create_server_endpoint(its_reliable_port, - true); + true); its_info->set_endpoint(its_reliable_endpoint, true); // TODO: put this in a method and check whether an assignment already exists! @@ -586,12 +603,11 @@ std::shared_ptr<serviceinfo> routing_manager_impl::create_service( _instance); auto found_servicegroup = servicegroups_.find(its_servicegroup); if (found_servicegroup == servicegroups_.end()) { - servicegroups_[its_servicegroup] = std::make_shared < servicegroup - > (its_servicegroup); + servicegroups_[its_servicegroup] + = std::make_shared<servicegroup>(its_servicegroup); } - - servicegroups_[its_servicegroup]->add_service(_service, _instance, - its_info); + servicegroups_[its_servicegroup]->add_service( + _service, _instance, its_info); services_[_service][_instance] = its_info; } else { host_->on_error(error_code_e::PORT_CONFIGURATION_MISSING); @@ -600,7 +616,7 @@ std::shared_ptr<serviceinfo> routing_manager_impl::create_service( host_->on_error(error_code_e::CONFIGURATION_MISSING); } - return its_info; + return (its_info); } std::shared_ptr<endpoint> routing_manager_impl::create_client_endpoint( @@ -628,7 +644,7 @@ std::shared_ptr<endpoint> routing_manager_impl::create_client_endpoint( host_->on_error(error_code_e::CLIENT_ENDPOINT_CREATION_FAILED); } - return its_endpoint; + return (its_endpoint); } std::shared_ptr<endpoint> routing_manager_impl::find_client_endpoint( @@ -644,7 +660,7 @@ std::shared_ptr<endpoint> routing_manager_impl::find_client_endpoint( } } } - return its_endpoint; + return (its_endpoint); } std::shared_ptr<endpoint> routing_manager_impl::find_or_create_client_endpoint( @@ -655,7 +671,7 @@ std::shared_ptr<endpoint> routing_manager_impl::find_or_create_client_endpoint( if (0 == its_endpoint) { its_endpoint = create_client_endpoint(_address, _port, _reliable); } - return its_endpoint; + return (its_endpoint); } std::shared_ptr<endpoint> routing_manager_impl::create_server_endpoint( @@ -689,7 +705,7 @@ std::shared_ptr<endpoint> routing_manager_impl::create_server_endpoint( host_->on_error(error_code_e::SERVER_ENDPOINT_CREATION_FAILED); } - return its_endpoint; + return (its_endpoint); } std::shared_ptr<endpoint> routing_manager_impl::find_server_endpoint( @@ -702,7 +718,7 @@ std::shared_ptr<endpoint> routing_manager_impl::find_server_endpoint( its_endpoint = found_endpoint->second; } } - return its_endpoint; + return (its_endpoint); } std::shared_ptr<endpoint> routing_manager_impl::find_or_create_server_endpoint( @@ -713,7 +729,7 @@ std::shared_ptr<endpoint> routing_manager_impl::find_or_create_server_endpoint( if (0 == its_endpoint) { its_endpoint = create_server_endpoint(_port, _reliable); } - return its_endpoint; + return (its_endpoint); } std::shared_ptr<endpoint> routing_manager_impl::find_local(client_t _client) { @@ -723,7 +739,7 @@ std::shared_ptr<endpoint> routing_manager_impl::find_local(client_t _client) { if (found_endpoint != local_clients_.end()) { its_endpoint = found_endpoint->second; } - return its_endpoint; + return (its_endpoint); } std::shared_ptr<endpoint> routing_manager_impl::create_local(client_t _client) { @@ -738,7 +754,7 @@ std::shared_ptr<endpoint> routing_manager_impl::create_local(client_t _client) { its_path.str()), io_); local_clients_[_client] = its_endpoint; its_endpoint->start(); - return its_endpoint; + return (its_endpoint); } std::shared_ptr<endpoint> routing_manager_impl::find_or_create_local( @@ -747,7 +763,7 @@ std::shared_ptr<endpoint> routing_manager_impl::find_or_create_local( if (!its_endpoint) { its_endpoint = create_local(_client); } - return its_endpoint; + return (its_endpoint); } void routing_manager_impl::remove_local(client_t _client) { @@ -759,7 +775,7 @@ void routing_manager_impl::remove_local(client_t _client) { std::shared_ptr<endpoint> routing_manager_impl::find_local( service_t _service, instance_t _instance) { - return find_local(find_local_client(_service, _instance)); + return (find_local(find_local_client(_service, _instance))); } client_t routing_manager_impl::find_local_client(service_t _service, @@ -772,7 +788,23 @@ client_t routing_manager_impl::find_local_client(service_t _service, its_client = found_instance->second; } } - return its_client; + return (its_client); +} + +std::set<client_t> routing_manager_impl::find_local_clients( + service_t _service, instance_t _instance, eventgroup_t _eventgroup) { + std::set<client_t> its_clients; + auto found_service = eventgroup_clients_.find(_service); + if (found_service != eventgroup_clients_.end()) { + auto found_instance = found_service->second.find(_instance); + if (found_instance != found_service->second.end()) { + auto found_eventgroup = found_instance->second.find(_eventgroup); + if (found_eventgroup != found_instance->second.end()) { + its_clients = found_eventgroup->second; + } + } + } + return (its_clients); } instance_t routing_manager_impl::find_instance(service_t _service, @@ -785,7 +817,7 @@ instance_t routing_manager_impl::find_instance(service_t _service, its_instance = found_endpoint->second; } } - return its_instance; + return (its_instance); } std::shared_ptr<endpoint> routing_manager_impl::find_remote_client( @@ -801,7 +833,7 @@ std::shared_ptr<endpoint> routing_manager_impl::find_remote_client( } } } - return its_endpoint; + return (its_endpoint); } std::shared_ptr<event> routing_manager_impl::find_event(service_t _service, @@ -818,23 +850,23 @@ std::shared_ptr<event> routing_manager_impl::find_event(service_t _service, } } } - return its_event; + return (its_event); } std::set<std::shared_ptr<event> > routing_manager_impl::find_events( service_t _service, instance_t _instance, eventgroup_t _eventgroup) { - std::set < std::shared_ptr<event> > its_events; + std::set<std::shared_ptr<event> > its_events; auto found_service = eventgroups_.find(_service); if (found_service != eventgroups_.end()) { auto found_instance = found_service->second.find(_instance); if (found_instance != found_service->second.end()) { auto found_eventgroup = found_instance->second.find(_eventgroup); if (found_eventgroup != found_instance->second.end()) { - return found_eventgroup->second->get_events(); + return (found_eventgroup->second->get_events()); } } } - return its_events; + return (its_events); } void routing_manager_impl::add_routing_info( @@ -870,6 +902,17 @@ void routing_manager_impl::del_routing_info(service_t _service, host_->on_availability(_service, _instance, false); stub_->on_stop_offer_service(VSOMEIP_ROUTING_CLIENT, _service, _instance); + // Implicit unsubscribe + auto found_service = eventgroups_.find(_service); + if (found_service != eventgroups_.end()) { + auto found_instance = found_service->second.find(_instance); + if (found_instance != found_service->second.end()) { + for (auto &its_eventgroup : found_instance->second) { + its_eventgroup.second->clear_targets(); + } + } + } + std::shared_ptr<endpoint> its_endpoint = its_info->get_endpoint(_reliable); if (its_endpoint) { if (1 >= service_instances_[_service].size()) { @@ -903,27 +946,26 @@ void routing_manager_impl::del_routing_info(service_t _service, } void routing_manager_impl::init_routing_info() { - VSOMEIP_INFO - << "Service Discovery disabled. Using static routing information."; + VSOMEIP_INFO<< "Service Discovery disabled. Using static routing information."; for (auto i : configuration_->get_remote_services()) { std::string its_address = configuration_->get_unicast(i.first, i.second); uint16_t its_reliable = configuration_->get_reliable_port(i.first, - i.second); + i.second); uint16_t its_unreliable = configuration_->get_unreliable_port(i.first, - i.second); + i.second); if (VSOMEIP_INVALID_PORT != its_reliable) { add_routing_info(i.first, i.second, DEFAULT_MAJOR, DEFAULT_MINOR, - DEFAULT_TTL, - boost::asio::ip::address::from_string(its_address), - its_reliable, true); + DEFAULT_TTL, + boost::asio::ip::address::from_string(its_address), + its_reliable, true); } if (VSOMEIP_INVALID_PORT != its_unreliable) { add_routing_info(i.first, i.second, DEFAULT_MAJOR, DEFAULT_MINOR, - DEFAULT_TTL, - boost::asio::ip::address::from_string(its_address), - its_unreliable, false); + DEFAULT_TTL, + boost::asio::ip::address::from_string(its_address), + its_unreliable, false); } } } @@ -959,6 +1001,7 @@ void routing_manager_impl::init_event_routing_info() { std::shared_ptr<event> its_event = find_event(i.first, j.first, l); if (its_event) { eventgroups_[i.first][j.first][k.first]->add_event(its_event); + its_event->add_eventgroup(k.first); } } } @@ -969,12 +1012,12 @@ void routing_manager_impl::init_event_routing_info() { for (auto i : eventgroups_) { for (auto j : i.second) { for (auto k : j.second) { - VSOMEIP_DEBUG << "Eventgroup [" << std::hex << std::setw(4) - << std::setfill('0') << i.first << "." << j.first << "." << k.first - << "] (MC: "; + VSOMEIP_DEBUG<< "Eventgroup [" << std::hex << std::setw(4) + << std::setfill('0') << i.first << "." << j.first << "." << k.first + << "] (MC: "; for (auto l : k.second->get_events()) { VSOMEIP_DEBUG << " Event " << std::hex << std::setw(4) - << std::setfill('0') << l->get_event(); + << std::setfill('0') << l->get_event(); } } } @@ -982,23 +1025,44 @@ void routing_manager_impl::init_event_routing_info() { #endif } -void routing_manager_impl::add_subscription( +void routing_manager_impl::on_subscribe( service_t _service, instance_t _instance, eventgroup_t _eventgroup, - const boost::asio::ip::address &_address, uint16_t _reliable_port, - uint16_t _unreliable_port) { - - VSOMEIP_INFO << "ADDING A SUBSCRIPTION FOR EVENT [" << std::hex - << std::setw(4) << std::setfill('0') << _service << "." << _instance - << "." << _eventgroup << "] for " << _address.to_string() << ":" - << std::dec << _unreliable_port; - + std::shared_ptr<endpoint_definition> _target) { + std::shared_ptr<eventgroupinfo> its_eventgroup + = find_eventgroup(_service, _instance, _eventgroup); + if (its_eventgroup) { + its_eventgroup->add_target(_target); + } else { + VSOMEIP_ERROR<< "subscribe: attempt to subscribe to unknown eventgroup!"; + } } -void routing_manager_impl::del_subscription( +void routing_manager_impl::on_unsubscribe( service_t _service, instance_t _instance, eventgroup_t _eventgroup, - const boost::asio::ip::address &_address, uint16_t _reliable_port, - uint16_t _unreliable_port) { + std::shared_ptr<endpoint_definition> _target) { + std::shared_ptr<eventgroupinfo> its_eventgroup + = find_eventgroup(_service, _instance, _eventgroup); + if (its_eventgroup) { + its_eventgroup->del_target(_target); + } else { + VSOMEIP_ERROR<<"unsubscribe: attempt to subscribe to unknown eventgroup!"; + } +} +void routing_manager_impl::on_subscribe_ack( + service_t _service, instance_t _instance, + const boost::asio::ip::address &_address, uint16_t _port) { + // TODO: find a way of getting rid of the following endpoint in + // case it used for multicast exclusively and the subscription + // is withdrawn or the service got lost + std::shared_ptr<endpoint> its_endpoint + = find_or_create_server_endpoint(_port, false); + if (its_endpoint) { + service_instances_[_service][its_endpoint.get()] = _instance; + its_endpoint->join(_address.to_string()); + } else { + VSOMEIP_ERROR << "Could not find/create multicast endpoint!"; + } } -} // namespace vsomeip +} // namespace vsomeip diff --git a/implementation/routing/src/routing_manager_proxy.cpp b/implementation/routing/src/routing_manager_proxy.cpp index dc6de94..4729e27 100644 --- a/implementation/routing/src/routing_manager_proxy.cpp +++ b/implementation/routing/src/routing_manager_proxy.cpp @@ -26,479 +26,517 @@ namespace vsomeip { routing_manager_proxy::routing_manager_proxy(routing_manager_host *_host) - : io_(_host->get_io()), - host_(_host), - client_(_host->get_client()), - sender_(0), - receiver_(0), - serializer_(std::make_shared< serializer>()), - deserializer_(std::make_shared< deserializer >()) { + : io_(_host->get_io()), + host_(_host), + client_(_host->get_client()), + sender_(0), + receiver_(0), + serializer_(std::make_shared<serializer>()), + deserializer_(std::make_shared<deserializer>()) { } routing_manager_proxy::~routing_manager_proxy() { } boost::asio::io_service & routing_manager_proxy::get_io() { - return io_; + return io_; } void routing_manager_proxy::init() { - uint32_t its_max_message_size = VSOMEIP_MAX_LOCAL_MESSAGE_SIZE; - if (VSOMEIP_MAX_TCP_MESSAGE_SIZE > its_max_message_size) - its_max_message_size = VSOMEIP_MAX_TCP_MESSAGE_SIZE; - if (VSOMEIP_MAX_UDP_MESSAGE_SIZE > its_max_message_size) - its_max_message_size = VSOMEIP_MAX_UDP_MESSAGE_SIZE; + uint32_t its_max_message_size = VSOMEIP_MAX_LOCAL_MESSAGE_SIZE; + if (VSOMEIP_MAX_TCP_MESSAGE_SIZE > its_max_message_size) + its_max_message_size = VSOMEIP_MAX_TCP_MESSAGE_SIZE; + if (VSOMEIP_MAX_UDP_MESSAGE_SIZE > its_max_message_size) + its_max_message_size = VSOMEIP_MAX_UDP_MESSAGE_SIZE; - serializer_->create_data(its_max_message_size); + serializer_->create_data(its_max_message_size); - std::stringstream its_sender_path; - sender_ = create_local(VSOMEIP_ROUTING_CLIENT); + std::stringstream its_sender_path; + sender_ = create_local(VSOMEIP_ROUTING_CLIENT); - std::stringstream its_client; - its_client << base_path << std::hex << client_; + std::stringstream its_client; + its_client << base_path << std::hex << client_; - ::unlink(its_client.str().c_str()); - receiver_ = std::make_shared< local_server_endpoint_impl >( - shared_from_this(), - boost::asio::local::stream_protocol::endpoint(its_client.str()), - io_); + ::unlink(its_client.str().c_str()); + receiver_ = std::make_shared<local_server_endpoint_impl>( + shared_from_this(), + boost::asio::local::stream_protocol::endpoint(its_client.str()), io_); - VSOMEIP_DEBUG << "Listening at " << its_client.str(); + VSOMEIP_DEBUG<< "Listening at " << its_client.str(); } void routing_manager_proxy::start() { - if (sender_) - sender_->start(); + if (sender_) + sender_->start(); - if (receiver_) - receiver_->start(); + if (receiver_) + receiver_->start(); - // Tell the stub where to find us - register_application(); + // Tell the stub where to find us + register_application(); } void routing_manager_proxy::stop() { - // Tell the stub we are no longer active - deregister_application(); + // Tell the stub we are no longer active + deregister_application(); - if (receiver_) - receiver_->stop(); + if (receiver_) + receiver_->stop(); - std::stringstream its_client; - its_client << base_path << std::hex << client_; - ::unlink(its_client.str().c_str()); + std::stringstream its_client; + its_client << base_path << std::hex << client_; + ::unlink(its_client.str().c_str()); } -void routing_manager_proxy::offer_service(client_t _client, - service_t _service, instance_t _instance, - major_version_t _major, minor_version_t _minor, ttl_t _ttl) { - - byte_t its_command[VSOMEIP_OFFER_SERVICE_COMMAND_SIZE]; - uint32_t its_size = VSOMEIP_OFFER_SERVICE_COMMAND_SIZE - VSOMEIP_COMMAND_HEADER_SIZE; - - its_command[VSOMEIP_COMMAND_TYPE_POS] = VSOMEIP_OFFER_SERVICE; - std::memcpy(&its_command[VSOMEIP_COMMAND_CLIENT_POS], &client_, sizeof(client_)); - std::memcpy(&its_command[VSOMEIP_COMMAND_SIZE_POS_MIN], &its_size, sizeof(its_size)); - std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS], &_service, sizeof(_service)); - std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS+2], &_instance, sizeof(_instance)); - its_command[VSOMEIP_COMMAND_PAYLOAD_POS+4] = _major; - std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS+5], &_minor, sizeof(_minor)); - std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS+9], &_ttl, sizeof(_ttl)); - - if (sender_) - sender_->send(its_command, sizeof(its_command)); +void routing_manager_proxy::offer_service(client_t _client, service_t _service, + instance_t _instance, + major_version_t _major, + minor_version_t _minor, ttl_t _ttl) { + + byte_t its_command[VSOMEIP_OFFER_SERVICE_COMMAND_SIZE]; + uint32_t its_size = VSOMEIP_OFFER_SERVICE_COMMAND_SIZE + - VSOMEIP_COMMAND_HEADER_SIZE; + + its_command[VSOMEIP_COMMAND_TYPE_POS] = VSOMEIP_OFFER_SERVICE; + std::memcpy(&its_command[VSOMEIP_COMMAND_CLIENT_POS], &client_, + sizeof(client_)); + std::memcpy(&its_command[VSOMEIP_COMMAND_SIZE_POS_MIN], &its_size, + sizeof(its_size)); + std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS], &_service, + sizeof(_service)); + std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 2], &_instance, + sizeof(_instance)); + its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 4] = _major; + std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 5], &_minor, + sizeof(_minor)); + std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 9], &_ttl, + sizeof(_ttl)); + + if (sender_) + sender_->send(its_command, sizeof(its_command)); } void routing_manager_proxy::stop_offer_service(client_t _client, - service_t _service, instance_t _instance) { - - byte_t its_command[VSOMEIP_STOP_OFFER_SERVICE_COMMAND_SIZE]; - uint32_t its_size = VSOMEIP_STOP_OFFER_SERVICE_COMMAND_SIZE - VSOMEIP_COMMAND_HEADER_SIZE; - - its_command[VSOMEIP_COMMAND_TYPE_POS] = VSOMEIP_STOP_OFFER_SERVICE; - std::memcpy(&its_command[VSOMEIP_COMMAND_CLIENT_POS], &client_, sizeof(client_)); - std::memcpy(&its_command[VSOMEIP_COMMAND_SIZE_POS_MIN], &its_size, sizeof(its_size)); - std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS], &_service, sizeof(_service)); - std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS+2], &_instance, sizeof(_instance)); - - if (sender_) - sender_->send(its_command, sizeof(its_command)); -} - -void routing_manager_proxy::publish_eventgroup(client_t _client, - service_t _service, instance_t _instance, eventgroup_t _eventgroup, - major_version_t _major, ttl_t _ttl) { - - byte_t its_command[VSOMEIP_PUBLISH_EVENTGROUP_COMMAND_SIZE]; - uint32_t its_size = VSOMEIP_PUBLISH_EVENTGROUP_COMMAND_SIZE - VSOMEIP_COMMAND_HEADER_SIZE; - - its_command[VSOMEIP_COMMAND_TYPE_POS] = VSOMEIP_PUBLISH_EVENTGROUP; - std::memcpy(&its_command[VSOMEIP_COMMAND_CLIENT_POS], &client_, sizeof(client_)); - std::memcpy(&its_command[VSOMEIP_COMMAND_SIZE_POS_MIN], &its_size, sizeof(its_size)); - std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS], &_service, sizeof(_service)); - std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS+2], &_instance, sizeof(_instance)); - std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS+4], &_eventgroup, sizeof(_eventgroup)); - its_command[VSOMEIP_COMMAND_PAYLOAD_POS+6] = _major; - std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS+7], &_ttl, sizeof(_ttl)); - - if (sender_) - sender_->send(its_command, sizeof(its_command)); -} - -void routing_manager_proxy::stop_publish_eventgroup(client_t _client, - service_t _service, instance_t _instance, eventgroup_t _eventgroup) { - byte_t its_command[VSOMEIP_STOP_PUBLISH_EVENTGROUP_COMMAND_SIZE]; - uint32_t its_size = VSOMEIP_STOP_PUBLISH_EVENTGROUP_COMMAND_SIZE - VSOMEIP_COMMAND_HEADER_SIZE; - - its_command[VSOMEIP_COMMAND_TYPE_POS] = VSOMEIP_PUBLISH_EVENTGROUP; - std::memcpy(&its_command[VSOMEIP_COMMAND_CLIENT_POS], &client_, sizeof(client_)); - std::memcpy(&its_command[VSOMEIP_COMMAND_SIZE_POS_MIN], &its_size, sizeof(its_size)); - std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS], &_service, sizeof(_service)); - std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS+2], &_instance, sizeof(_instance)); - std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS+4], &_eventgroup, sizeof(_eventgroup)); - - if (sender_) - sender_->send(its_command, sizeof(its_command)); + service_t _service, + instance_t _instance) { + + byte_t its_command[VSOMEIP_STOP_OFFER_SERVICE_COMMAND_SIZE]; + uint32_t its_size = VSOMEIP_STOP_OFFER_SERVICE_COMMAND_SIZE + - VSOMEIP_COMMAND_HEADER_SIZE; + + its_command[VSOMEIP_COMMAND_TYPE_POS] = VSOMEIP_STOP_OFFER_SERVICE; + std::memcpy(&its_command[VSOMEIP_COMMAND_CLIENT_POS], &client_, + sizeof(client_)); + std::memcpy(&its_command[VSOMEIP_COMMAND_SIZE_POS_MIN], &its_size, + sizeof(its_size)); + std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS], &_service, + sizeof(_service)); + std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 2], &_instance, + sizeof(_instance)); + + if (sender_) + sender_->send(its_command, sizeof(its_command)); } void routing_manager_proxy::request_service(client_t _client, - service_t _service, instance_t _instance, - major_version_t _major, minor_version_t _minor, ttl_t _ttl) { + service_t _service, + instance_t _instance, + major_version_t _major, + minor_version_t _minor, + ttl_t _ttl) { } void routing_manager_proxy::release_service(client_t _client, - service_t _service, instance_t _instance) { + service_t _service, + instance_t _instance) { } -void routing_manager_proxy::subscribe(client_t _client, - service_t _service, instance_t _instance, eventgroup_t _eventgroup, major_version_t _major, ttl_t _ttl) { +void routing_manager_proxy::subscribe(client_t _client, service_t _service, + instance_t _instance, + eventgroup_t _eventgroup, + major_version_t _major, ttl_t _ttl) { + byte_t its_command[VSOMEIP_SUBSCRIBE_COMMAND_SIZE]; + uint32_t its_size = VSOMEIP_SUBSCRIBE_COMMAND_SIZE + - VSOMEIP_COMMAND_HEADER_SIZE; + + its_command[VSOMEIP_COMMAND_TYPE_POS] = VSOMEIP_SUBSCRIBE; + std::memcpy(&its_command[VSOMEIP_COMMAND_CLIENT_POS], &client_, + sizeof(client_)); + std::memcpy(&its_command[VSOMEIP_COMMAND_SIZE_POS_MIN], &its_size, + sizeof(its_size)); + std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS], &_service, + sizeof(_service)); + std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 2], &_instance, + sizeof(_instance)); + std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 4], &_eventgroup, + sizeof(_eventgroup)); + its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 6] = _major; + std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 7], &_ttl, + sizeof(_ttl)); + + if (sender_) + sender_->send(its_command, sizeof(its_command)); } -void routing_manager_proxy::unsubscribe(client_t _client, - service_t _service, instance_t _instance, eventgroup_t _eventgroup) { +void routing_manager_proxy::unsubscribe(client_t _client, service_t _service, + instance_t _instance, + eventgroup_t _eventgroup) { + byte_t its_command[VSOMEIP_UNSUBSCRIBE_COMMAND_SIZE]; + uint32_t its_size = VSOMEIP_UNSUBSCRIBE_COMMAND_SIZE + - VSOMEIP_COMMAND_HEADER_SIZE; + + its_command[VSOMEIP_COMMAND_TYPE_POS] = VSOMEIP_UNSUBSCRIBE; + std::memcpy(&its_command[VSOMEIP_COMMAND_CLIENT_POS], &client_, + sizeof(client_)); + std::memcpy(&its_command[VSOMEIP_COMMAND_SIZE_POS_MIN], &its_size, + sizeof(its_size)); + std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS], &_service, + sizeof(_service)); + std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 2], &_instance, + sizeof(_instance)); + std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + 4], &_eventgroup, + sizeof(_eventgroup)); + + if (sender_) + sender_->send(its_command, sizeof(its_command)); } bool routing_manager_proxy::send(client_t its_client, - std::shared_ptr< message > _message, - bool _flush, bool _reliable) { - bool is_sent(false); - std::unique_lock< std::mutex > its_lock(serialize_mutex_); - if (serializer_->serialize(_message.get())) { - is_sent = send(its_client, serializer_->get_data(), serializer_->get_size(), _message->get_instance(), _flush, _reliable); - serializer_->reset(); - } - return is_sent; + std::shared_ptr<message> _message, + bool _flush, + bool _reliable) { + bool is_sent(false); + std::unique_lock<std::mutex> its_lock(serialize_mutex_); + if (serializer_->serialize(_message.get())) { + is_sent = send(its_client, serializer_->get_data(), serializer_->get_size(), + _message->get_instance(), _flush, _reliable); + serializer_->reset(); + } + return is_sent; } -bool routing_manager_proxy::send(client_t _client, - const byte_t *_data, length_t _size, - instance_t _instance, - bool _flush, bool _reliable) { - bool is_sent(false); - std::shared_ptr< endpoint > its_target; - - if (_size > VSOMEIP_MESSAGE_TYPE_POS) { - if (is_request(_data[VSOMEIP_MESSAGE_TYPE_POS])) { - service_t its_service = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_SERVICE_POS_MIN], _data[VSOMEIP_SERVICE_POS_MAX]); - std::unique_lock< std::mutex > its_lock(send_mutex_); - its_target = find_local(its_service, _instance); - } else { - client_t its_client = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_CLIENT_POS_MIN], _data[VSOMEIP_CLIENT_POS_MAX]); - std::unique_lock< std::mutex > its_lock(send_mutex_); - its_target = find_local(its_client); - } - - // If no direct endpoint could be found, route to stub - if (!its_target) - its_target = sender_; - - std::vector< byte_t > its_command(VSOMEIP_COMMAND_HEADER_SIZE + _size + - sizeof(instance_t) + sizeof(bool) + sizeof(bool)); - its_command[VSOMEIP_COMMAND_TYPE_POS] = VSOMEIP_SEND; - std::memcpy(&its_command[VSOMEIP_COMMAND_CLIENT_POS], &_client, sizeof(client_t)); - std::memcpy(&its_command[VSOMEIP_COMMAND_SIZE_POS_MIN], &_size, sizeof(_size)); - std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS], _data, _size); - std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + _size], &_instance, sizeof(instance_t)); - its_command[VSOMEIP_COMMAND_PAYLOAD_POS + _size + sizeof(instance_t)] = _flush; - its_command[VSOMEIP_COMMAND_PAYLOAD_POS + _size + sizeof(instance_t) + sizeof(bool)] = _reliable; +bool routing_manager_proxy::send(client_t _client, const byte_t *_data, + length_t _size, instance_t _instance, + bool _flush, + bool _reliable) { + bool is_sent(false); + std::shared_ptr<endpoint> its_target; + + if (_size > VSOMEIP_MESSAGE_TYPE_POS) { + if (is_request(_data[VSOMEIP_MESSAGE_TYPE_POS])) { + service_t its_service = VSOMEIP_BYTES_TO_WORD( + _data[VSOMEIP_SERVICE_POS_MIN], _data[VSOMEIP_SERVICE_POS_MAX]); + std::unique_lock<std::mutex> its_lock(send_mutex_); + its_target = find_local(its_service, _instance); + } else { + client_t its_client = VSOMEIP_BYTES_TO_WORD( + _data[VSOMEIP_CLIENT_POS_MIN], _data[VSOMEIP_CLIENT_POS_MAX]); + std::unique_lock<std::mutex> its_lock(send_mutex_); + its_target = find_local(its_client); + } + + // If no direct endpoint could be found, route to stub + if (!its_target) + its_target = sender_; + + std::vector<byte_t> its_command( + VSOMEIP_COMMAND_HEADER_SIZE + _size + sizeof(instance_t) + sizeof(bool) + + sizeof(bool)); + its_command[VSOMEIP_COMMAND_TYPE_POS] = VSOMEIP_SEND; + std::memcpy(&its_command[VSOMEIP_COMMAND_CLIENT_POS], &_client, + sizeof(client_t)); + std::memcpy(&its_command[VSOMEIP_COMMAND_SIZE_POS_MIN], &_size, + sizeof(_size)); + std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS], _data, _size); + std::memcpy(&its_command[VSOMEIP_COMMAND_PAYLOAD_POS + _size], &_instance, + sizeof(instance_t)); + its_command[VSOMEIP_COMMAND_PAYLOAD_POS + _size + sizeof(instance_t)] = + _flush; + its_command[VSOMEIP_COMMAND_PAYLOAD_POS + _size + sizeof(instance_t) + + sizeof(bool)] = _reliable; #if 0 - std::cout << "rmp:send: "; - for (int i = 0; i < its_command.size(); i++) - std::cout << std::hex << std::setw(2) << std::setfill('0') << (int)its_command[i] << " "; - std::cout << std::endl; + std::cout << "rmp:send: "; + for (int i = 0; i < its_command.size(); i++) + std::cout << std::hex << std::setw(2) << std::setfill('0') << (int)its_command[i] << " "; + std::cout << std::endl; #endif - is_sent = its_target->send(&its_command[0], its_command.size()); - } - return is_sent; + is_sent = its_target->send(&its_command[0], its_command.size()); + } + return is_sent; } -void routing_manager_proxy::set(client_t _client, - service_t _service, instance_t _instance, - event_t _event, const std::shared_ptr<payload> &_value) { +void routing_manager_proxy::set(client_t _client, service_t _service, + instance_t _instance, event_t _event, + const std::shared_ptr<payload> &_value) { } -void routing_manager_proxy::on_connect(std::shared_ptr< endpoint > _endpoint) { +void routing_manager_proxy::on_connect(std::shared_ptr<endpoint> _endpoint) { } -void routing_manager_proxy::on_disconnect(std::shared_ptr< endpoint > _endpoint) { +void routing_manager_proxy::on_disconnect(std::shared_ptr<endpoint> _endpoint) { } -void routing_manager_proxy::on_message( - const byte_t *_data, length_t _size, endpoint *_receiver) { +void routing_manager_proxy::on_message(const byte_t *_data, length_t _size, + endpoint *_receiver) { #if 0 - std::cout << "rmp::on_message: "; - for (int i = 0; i < _size; ++i) - std::cout << std::hex << std::setw(2) << std::setfill('0') << (int)_data[i] << " "; - std::cout << std::endl; + std::cout << "rmp::on_message: "; + for (int i = 0; i < _size; ++i) + std::cout << std::hex << std::setw(2) << std::setfill('0') << (int)_data[i] << " "; + std::cout << std::endl; #endif - byte_t its_command; - client_t its_client; - length_t its_length; - - if (_size > VSOMEIP_COMMAND_SIZE_POS_MAX) { - its_command = _data[VSOMEIP_COMMAND_TYPE_POS]; - std::memcpy(&its_client, &_data[VSOMEIP_COMMAND_CLIENT_POS], sizeof(its_client)); - std::memcpy(&its_length, &_data[VSOMEIP_COMMAND_SIZE_POS_MIN], sizeof(its_length)); - - switch (its_command) { - case VSOMEIP_SEND: - { - instance_t its_instance; - std::memcpy(&its_instance, - &_data[_size - sizeof(instance_t) - sizeof(bool) - sizeof(bool)], - sizeof(instance_t)); - deserializer_->set_data(&_data[VSOMEIP_COMMAND_PAYLOAD_POS], its_length); - std::shared_ptr< message > its_message(deserializer_->deserialize_message()); - if (its_message) { - its_message->set_instance(its_instance); - host_->on_message(its_message); - } else { - // TODO: send_error(return_code_e::E_MALFORMED_MESSAGE); - } - deserializer_->reset(); - } - break; - - case VSOMEIP_ROUTING_INFO: - on_routing_info(&_data[VSOMEIP_COMMAND_PAYLOAD_POS], its_length); - break; - - case VSOMEIP_PING: - send_pong(); - break; - - default: - break; - } - } + byte_t its_command; + client_t its_client; + length_t its_length; + + if (_size > VSOMEIP_COMMAND_SIZE_POS_MAX) { + its_command = _data[VSOMEIP_COMMAND_TYPE_POS]; + std::memcpy(&its_client, &_data[VSOMEIP_COMMAND_CLIENT_POS], + sizeof(its_client)); + std::memcpy(&its_length, &_data[VSOMEIP_COMMAND_SIZE_POS_MIN], + sizeof(its_length)); + + switch (its_command) { + case VSOMEIP_SEND: { + instance_t its_instance; + std::memcpy( + &its_instance, + &_data[_size - sizeof(instance_t) - sizeof(bool) - sizeof(bool)], + sizeof(instance_t)); + deserializer_->set_data(&_data[VSOMEIP_COMMAND_PAYLOAD_POS], + its_length); + std::shared_ptr<message> its_message( + deserializer_->deserialize_message()); + if (its_message) { + its_message->set_instance(its_instance); + host_->on_message(its_message); + } else { + // TODO: send_error(return_code_e::E_MALFORMED_MESSAGE); + } + deserializer_->reset(); + } + break; + + case VSOMEIP_ROUTING_INFO: + on_routing_info(&_data[VSOMEIP_COMMAND_PAYLOAD_POS], its_length); + break; + + case VSOMEIP_PING: + send_pong(); + break; + + default: + break; + } + } } -void routing_manager_proxy::on_routing_info(const byte_t *_data, uint32_t _size) { +void routing_manager_proxy::on_routing_info(const byte_t *_data, + uint32_t _size) { #if 0 - std::cout << "rmp::on_routing_info: "; - for (int i = 0; i < _size; ++i) - std::cout << std::hex << std::setw(2) << std::setfill('0') << (int)_data[i] << " "; - std::cout << std::endl; + std::cout << "rmp::on_routing_info: "; + for (int i = 0; i < _size; ++i) + std::cout << std::hex << std::setw(2) << std::setfill('0') << (int)_data[i] << " "; + std::cout << std::endl; #endif - event_type_e its_state(event_type_e::DEREGISTERED); - - std::map< service_t, std::map< instance_t, client_t > > old_local_services - = local_services_; - local_services_.clear(); - - uint32_t i = 0; - while (i + sizeof(uint32_t) <= _size) { - uint32_t its_client_size; - std::memcpy(&its_client_size, &_data[i], sizeof(uint32_t)); - i += sizeof(uint32_t); - - if (i + sizeof(client_t) <= _size) { - client_t its_client; - std::memcpy(&its_client, &_data[i], sizeof(client_t)); - i += sizeof(client_t); - - if (its_client != client_) { - (void)find_or_create_local(its_client); - } else { - its_state = event_type_e::REGISTERED; - } - - if (i + sizeof(uint32_t) <= _size) { - uint32_t its_services_size; - std::memcpy(&its_services_size, &_data[i], sizeof(uint32_t)); - i += sizeof(uint32_t); - - uint32_t j = 0; - while (j + sizeof(service_t) + sizeof(instance_t) <= its_services_size) { - service_t its_service; - std::memcpy(&its_service, &_data[i+j], sizeof(service_t)); - j += sizeof(service_t); - - instance_t its_instance; - std::memcpy(&its_instance, &_data[i+j], sizeof(instance_t)); - j += sizeof(instance_t); - - if (its_client != client_) - local_services_[its_service][its_instance] = its_client; - } - - i += its_services_size; - } - } - } - - // inform host about its own registration state - host_->on_event(its_state); - - // Check for services that are no longer available - for (auto i : old_local_services) { - auto found_service = local_services_.find(i.first); - if (found_service != local_services_.end()) { - for (auto j : i.second) { - auto found_instance = found_service->second.find(j.first); - if (found_instance == found_service->second.end()) { - host_->on_availability(i.first, j.first, false); - } - } - } else { - for (auto j : i.second) { - host_->on_availability(i.first, j.first, false); - } - } - } - - // Check for services that are newly available - for (auto i : local_services_) { - auto found_service = old_local_services.find(i.first); - if (found_service != old_local_services.end()) { - for (auto j : i.second) { - auto found_instance = found_service->second.find(j.first); - if (found_instance == found_service->second.end()) { - host_->on_availability(i.first, j.first, true); - } - } - } else { - for (auto j : i.second) { - host_->on_availability(i.first, j.first, true); - } - } - } + event_type_e its_state(event_type_e::DEREGISTERED); + + std::map<service_t, std::map<instance_t, client_t> > old_local_services = + local_services_; + local_services_.clear(); + + uint32_t i = 0; + while (i + sizeof(uint32_t) <= _size) { + uint32_t its_client_size; + std::memcpy(&its_client_size, &_data[i], sizeof(uint32_t)); + i += sizeof(uint32_t); + + if (i + sizeof(client_t) <= _size) { + client_t its_client; + std::memcpy(&its_client, &_data[i], sizeof(client_t)); + i += sizeof(client_t); + + if (its_client != client_) { + (void) find_or_create_local(its_client); + } else { + its_state = event_type_e::REGISTERED; + } + + if (i + sizeof(uint32_t) <= _size) { + uint32_t its_services_size; + std::memcpy(&its_services_size, &_data[i], sizeof(uint32_t)); + i += sizeof(uint32_t); + + uint32_t j = 0; + while (j + sizeof(service_t) + sizeof(instance_t) <= its_services_size) { + service_t its_service; + std::memcpy(&its_service, &_data[i + j], sizeof(service_t)); + j += sizeof(service_t); + + instance_t its_instance; + std::memcpy(&its_instance, &_data[i + j], sizeof(instance_t)); + j += sizeof(instance_t); + + if (its_client != client_) + local_services_[its_service][its_instance] = its_client; + } + + i += its_services_size; + } + } + } + + // inform host about its own registration state + host_->on_event(its_state); + + // Check for services that are no longer available + for (auto i : old_local_services) { + auto found_service = local_services_.find(i.first); + if (found_service != local_services_.end()) { + for (auto j : i.second) { + auto found_instance = found_service->second.find(j.first); + if (found_instance == found_service->second.end()) { + host_->on_availability(i.first, j.first, false); + } + } + } else { + for (auto j : i.second) { + host_->on_availability(i.first, j.first, false); + } + } + } + + // Check for services that are newly available + for (auto i : local_services_) { + auto found_service = old_local_services.find(i.first); + if (found_service != old_local_services.end()) { + for (auto j : i.second) { + auto found_instance = found_service->second.find(j.first); + if (found_instance == found_service->second.end()) { + host_->on_availability(i.first, j.first, true); + } + } + } else { + for (auto j : i.second) { + host_->on_availability(i.first, j.first, true); + } + } + } } -bool routing_manager_proxy::is_available(service_t _service, instance_t _instance) const { - auto found_local_service = local_services_.find(_service); - if (found_local_service != local_services_.end()) { - auto found_local_instance = found_local_service->second.find(_instance); - if (found_local_instance != found_local_service->second.end()) { - return true; - } - } - - // TODO: ask routing manager for external services - return false; +bool routing_manager_proxy::is_available(service_t _service, + instance_t _instance) const { + auto found_local_service = local_services_.find(_service); + if (found_local_service != local_services_.end()) { + auto found_local_instance = found_local_service->second.find(_instance); + if (found_local_instance != found_local_service->second.end()) { + return true; + } + } + + // TODO: ask routing manager for external services + return false; } void routing_manager_proxy::register_application() { - byte_t its_command[] = { - VSOMEIP_REGISTER_APPLICATION, - 0x00, 0x00, - 0x00, 0x00, 0x00, 0x00 - }; + byte_t its_command[] = { + VSOMEIP_REGISTER_APPLICATION, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 }; - std::memcpy(&its_command[VSOMEIP_COMMAND_CLIENT_POS], &client_, sizeof(client_)); - std::memset(&its_command[VSOMEIP_COMMAND_SIZE_POS_MIN], 0, sizeof(uint32_t)); + std::memcpy(&its_command[VSOMEIP_COMMAND_CLIENT_POS], &client_, + sizeof(client_)); + std::memset(&its_command[VSOMEIP_COMMAND_SIZE_POS_MIN], 0, sizeof(uint32_t)); - if (sender_) - sender_->send(its_command, sizeof(its_command)); + if (sender_) + sender_->send(its_command, sizeof(its_command)); } void routing_manager_proxy::deregister_application() { - uint32_t its_size = sizeof(client_); - - std::vector< byte_t > its_command(VSOMEIP_COMMAND_HEADER_SIZE + its_size); - its_command[VSOMEIP_COMMAND_TYPE_POS] = VSOMEIP_DEREGISTER_APPLICATION; - std::memcpy(&its_command[VSOMEIP_COMMAND_CLIENT_POS], &client_, sizeof(client_)); - std::memcpy(&its_command[VSOMEIP_COMMAND_SIZE_POS_MIN], &its_size, sizeof(its_size)); - if (sender_) - sender_->send(&its_command[0], its_command.size()); + uint32_t its_size = sizeof(client_); + + std::vector<byte_t> its_command(VSOMEIP_COMMAND_HEADER_SIZE + its_size); + its_command[VSOMEIP_COMMAND_TYPE_POS] = VSOMEIP_DEREGISTER_APPLICATION; + std::memcpy(&its_command[VSOMEIP_COMMAND_CLIENT_POS], &client_, + sizeof(client_)); + std::memcpy(&its_command[VSOMEIP_COMMAND_SIZE_POS_MIN], &its_size, + sizeof(its_size)); + if (sender_) + sender_->send(&its_command[0], its_command.size()); } - -std::shared_ptr< endpoint > routing_manager_proxy::find_local(client_t _client) { - std::shared_ptr< endpoint > its_endpoint; - auto found_endpoint = local_endpoints_.find(_client); - if (found_endpoint != local_endpoints_.end()) { - its_endpoint = found_endpoint->second; - } - return its_endpoint; +std::shared_ptr<endpoint> routing_manager_proxy::find_local(client_t _client) { + std::shared_ptr<endpoint> its_endpoint; + auto found_endpoint = local_endpoints_.find(_client); + if (found_endpoint != local_endpoints_.end()) { + its_endpoint = found_endpoint->second; + } + return its_endpoint; } -std::shared_ptr< endpoint > routing_manager_proxy::create_local(client_t _client) { - std::stringstream its_path; - its_path << base_path << std::hex << _client; +std::shared_ptr<endpoint> routing_manager_proxy::create_local( + client_t _client) { + std::stringstream its_path; + its_path << base_path << std::hex << _client; - VSOMEIP_DEBUG << "Connecting to [" << std::hex << _client - << "] at " << its_path.str(); + VSOMEIP_DEBUG<< "Connecting to [" << std::hex << _client + << "] at " << its_path.str(); - std::shared_ptr< endpoint > its_endpoint - = std::make_shared< local_client_endpoint_impl >( - shared_from_this(), - boost::asio::local::stream_protocol::endpoint(its_path.str()), - io_); + std::shared_ptr<endpoint> its_endpoint = std::make_shared< + local_client_endpoint_impl>( + shared_from_this(), + boost::asio::local::stream_protocol::endpoint(its_path.str()), io_); - local_endpoints_[_client] = its_endpoint; + local_endpoints_[_client] = its_endpoint; - return its_endpoint; + return its_endpoint; } -std::shared_ptr< endpoint > routing_manager_proxy::find_or_create_local(client_t _client) { - std::shared_ptr< endpoint > its_endpoint(find_local(_client)); - if (0 == its_endpoint) { - its_endpoint = create_local(_client); - its_endpoint->start(); - } - return its_endpoint; +std::shared_ptr<endpoint> routing_manager_proxy::find_or_create_local( + client_t _client) { + std::shared_ptr<endpoint> its_endpoint(find_local(_client)); + if (0 == its_endpoint) { + its_endpoint = create_local(_client); + its_endpoint->start(); + } + return its_endpoint; } void routing_manager_proxy::remove_local(client_t _client) { - std::shared_ptr< endpoint > its_endpoint(find_local(_client)); - if (its_endpoint) - its_endpoint->stop(); - local_endpoints_.erase(_client); + std::shared_ptr<endpoint> its_endpoint(find_local(_client)); + if (its_endpoint) + its_endpoint->stop(); + local_endpoints_.erase(_client); } -std::shared_ptr< endpoint > routing_manager_proxy::find_local(service_t _service, instance_t _instance) { - client_t its_client(0); - auto found_service = local_services_.find(_service); - if (found_service != local_services_.end()) { - auto found_instance = found_service->second.find(_instance); - if (found_instance != found_service->second.end()) { - its_client = found_instance->second; - } - } - return find_local(its_client); +std::shared_ptr<endpoint> routing_manager_proxy::find_local( + service_t _service, instance_t _instance) { + client_t its_client(0); + auto found_service = local_services_.find(_service); + if (found_service != local_services_.end()) { + auto found_instance = found_service->second.find(_instance); + if (found_instance != found_service->second.end()) { + its_client = found_instance->second; + } + } + return find_local(its_client); } bool routing_manager_proxy::is_request(byte_t _message_type) const { - message_type_e its_type = static_cast< message_type_e >(_message_type); - return (its_type < message_type_e::NOTIFICATION || - its_type == message_type_e::REQUEST_ACK || - its_type == message_type_e::REQUEST_NO_RETURN_ACK); + message_type_e its_type = static_cast<message_type_e>(_message_type); + return (its_type < message_type_e::NOTIFICATION + || its_type == message_type_e::REQUEST_ACK + || its_type == message_type_e::REQUEST_NO_RETURN_ACK); } void routing_manager_proxy::send_pong() const { - byte_t its_pong[] = { - VSOMEIP_PONG, - 0x00, 0x00, - 0x00, 0x00, 0x00, 0x00, - }; + byte_t its_pong[] = { + VSOMEIP_PONG, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, }; - std::memcpy(&its_pong[VSOMEIP_COMMAND_CLIENT_POS], &client_, sizeof(client_t)); + std::memcpy(&its_pong[VSOMEIP_COMMAND_CLIENT_POS], &client_, + sizeof(client_t)); - if (sender_) - sender_->send(its_pong, sizeof(its_pong)); + if (sender_) + sender_->send(its_pong, sizeof(its_pong)); } -} // namespace vsomeip +} // namespace vsomeip diff --git a/implementation/routing/src/routing_manager_stub.cpp b/implementation/routing/src/routing_manager_stub.cpp index ba2a87e..c8dd00a 100644 --- a/implementation/routing/src/routing_manager_stub.cpp +++ b/implementation/routing/src/routing_manager_stub.cpp @@ -150,13 +150,13 @@ void routing_manager_stub::on_message(const byte_t *_data, length_t _size, on_stop_offer_service(its_client, its_service, its_instance); break; - case VSOMEIP_PUBLISH_EVENTGROUP: - routing_->publish_eventgroup(its_client, its_service, + case VSOMEIP_SUBSCRIBE: + routing_->subscribe(its_client, its_service, its_instance, its_eventgroup, its_major, its_ttl); break; - case VSOMEIP_STOP_PUBLISH_EVENTGROUP: - routing_->stop_publish_eventgroup(its_client, its_service, + case VSOMEIP_UNSUBSCRIBE: + routing_->unsubscribe(its_client, its_service, its_instance, its_eventgroup); break; diff --git a/implementation/runtime/include/application_impl.hpp b/implementation/runtime/include/application_impl.hpp index 6771789..b2faad0 100644 --- a/implementation/runtime/include/application_impl.hpp +++ b/implementation/runtime/include/application_impl.hpp @@ -39,12 +39,6 @@ public: void stop_offer_service(service_t _service, instance_t _instance); - void publish_eventgroup(service_t _service, instance_t _instance, - eventgroup_t _eventgroup, major_version_t _major, ttl_t _ttl); - - void stop_publish_eventgroup(service_t _service, instance_t _instance, - eventgroup_t _eventgroup); - // Consume services void request_service(service_t _service, instance_t _instance, major_version_t _major, minor_version_t _minor, ttl_t _ttl); diff --git a/implementation/runtime/src/application_impl.cpp b/implementation/runtime/src/application_impl.cpp index 8d9eb73..ba6ff38 100644 --- a/implementation/runtime/src/application_impl.cpp +++ b/implementation/runtime/src/application_impl.cpp @@ -128,21 +128,6 @@ void application_impl::stop_offer_service(service_t _service, routing_->stop_offer_service(client_, _service, _instance); } -void application_impl::publish_eventgroup(service_t _service, - instance_t _instance, eventgroup_t _eventgroup, major_version_t _major, - ttl_t _ttl) { - if (routing_) - routing_->publish_eventgroup(client_, _service, _instance, _eventgroup, - _major, _ttl); -} - -void application_impl::stop_publish_eventgroup(service_t _service, - instance_t _instance, eventgroup_t _eventgroup) { - if (routing_) - routing_->stop_publish_eventgroup(client_, _service, _instance, - _eventgroup); -} - void application_impl::request_service(service_t _service, instance_t _instance, major_version_t _major, minor_version_t _minor, ttl_t _ttl) { if (routing_) diff --git a/implementation/service_discovery/include/service_discovery_host.hpp b/implementation/service_discovery/include/service_discovery_host.hpp index dbd5a16..84e5297 100644 --- a/implementation/service_discovery/include/service_discovery_host.hpp +++ b/implementation/service_discovery/include/service_discovery_host.hpp @@ -19,6 +19,7 @@ namespace vsomeip { class configuration; class endpoint; +class endpoint_definition; namespace sd { @@ -41,9 +42,8 @@ class service_discovery_host { virtual bool send(client_t _client, std::shared_ptr<message> _message, bool _flush, bool _reliable) = 0; - virtual bool send_subscribe(const boost::asio::ip::address &_address, - uint16_t _port, bool _reliable, - const byte_t *_data, uint32_t _size) = 0; + virtual bool send_to(const boost::asio::ip::address &_address, uint16_t _port, + bool _reliable, const byte_t *_data, uint32_t _size) = 0; virtual void add_routing_info(service_t _service, instance_t _instance, major_version_t _major, minor_version_t _minor, @@ -54,17 +54,14 @@ class service_discovery_host { virtual void del_routing_info(service_t _service, instance_t _instance, bool _reliable) = 0; - virtual void add_subscription(service_t _service, instance_t _instance, - eventgroup_t _eventgroup, - const boost::asio::ip::address &_address, - uint16_t _reliable_port, - uint16_t _unreliable_port) = 0; + virtual void on_subscribe(service_t _service, instance_t _instance, eventgroup_t _eventgroup, + std::shared_ptr<endpoint_definition> _target) = 0; - virtual void del_subscription(service_t _service, instance_t _instance, - eventgroup_t _eventgroup, - const boost::asio::ip::address &_address, - uint16_t _reliable_port, - uint16_t _unreliable_port) = 0; + virtual void on_unsubscribe(service_t _service, instance_t _instance, eventgroup_t _eventgroup, + std::shared_ptr<endpoint_definition> _target) = 0; + + virtual void on_subscribe_ack(service_t _service, instance_t _instance, + const boost::asio::ip::address &_address, uint16_t _port) = 0; virtual std::shared_ptr<endpoint> find_remote_client(service_t _service, instance_t _instance, diff --git a/implementation/service_discovery/include/service_discovery_impl.hpp b/implementation/service_discovery/include/service_discovery_impl.hpp index 1393bdb..4279cbf 100644 --- a/implementation/service_discovery/include/service_discovery_impl.hpp +++ b/implementation/service_discovery/include/service_discovery_impl.hpp @@ -33,6 +33,8 @@ class service_discovery_fsm; class service_discovery_host; class subscription; +typedef std::map<service_t, std::map<instance_t, std::shared_ptr<request> > > requests_t; + class service_discovery_impl : public service_discovery, public std::enable_shared_from_this<service_discovery_impl> { public: @@ -61,12 +63,17 @@ class service_discovery_impl : public service_discovery, void on_message(const byte_t *_data, length_t _length); private: + session_t get_session(const boost::asio::ip::address &_address); + void increment_session(const boost::asio::ip::address &_address); + void insert_option(std::shared_ptr<message_impl> &_message, std::shared_ptr<entry_impl> _entry, const boost::asio::ip::address &_address, uint16_t _port, bool _is_reliable); - void insert_service_entries(std::shared_ptr<message_impl> &_message, - services_t &_services, bool _is_offer); + void insert_find_entries(std::shared_ptr<message_impl> &_message, + requests_t &_requests); + void insert_offer_entries(std::shared_ptr<message_impl> &_message, + services_t &_services); void insert_subscription(std::shared_ptr<message_impl> &_message, service_t _service, instance_t _instance, eventgroup_t _eventgroup, @@ -114,7 +121,7 @@ class service_discovery_impl : public service_discovery, std::shared_ptr<service_discovery_fsm> default_; std::map<std::string, std::shared_ptr<service_discovery_fsm> > additional_; - std::map<service_t, std::map<instance_t, std::shared_ptr<request> > > requested_; + requests_t requested_; std::map<service_t, std::map<instance_t, std::map<eventgroup_t, std::shared_ptr<subscription> > > > subscribed_; diff --git a/implementation/service_discovery/src/ipv4_option_impl.cpp b/implementation/service_discovery/src/ipv4_option_impl.cpp index 33dd864..ad7449e 100644 --- a/implementation/service_discovery/src/ipv4_option_impl.cpp +++ b/implementation/service_discovery/src/ipv4_option_impl.cpp @@ -17,6 +17,7 @@ namespace sd { ipv4_option_impl::ipv4_option_impl(bool _is_multicast) {
length_ = (1 + 4 + 1 + 1 + 2);
type_ = (_is_multicast ? option_type_e::IP4_MULTICAST : option_type_e::IP4_ENDPOINT);
+ is_udp_ = _is_multicast;
}
ipv4_option_impl::~ipv4_option_impl() {
diff --git a/implementation/service_discovery/src/ipv6_option_impl.cpp b/implementation/service_discovery/src/ipv6_option_impl.cpp index 26a711d..1017f40 100755 --- a/implementation/service_discovery/src/ipv6_option_impl.cpp +++ b/implementation/service_discovery/src/ipv6_option_impl.cpp @@ -17,6 +17,7 @@ namespace sd { ipv6_option_impl::ipv6_option_impl(bool _is_multicast) {
length_ = (1 + 16 + 1 + 1 + 2);
type_ = (_is_multicast ? option_type_e::IP6_MULTICAST : option_type_e::IP6_ENDPOINT) ;
+ is_udp_ = _is_multicast;
}
ipv6_option_impl::~ipv6_option_impl() {
diff --git a/implementation/service_discovery/src/service_discovery_impl.cpp b/implementation/service_discovery/src/service_discovery_impl.cpp index 1f99c8a..fc63632 100644 --- a/implementation/service_discovery/src/service_discovery_impl.cpp +++ b/implementation/service_discovery/src/service_discovery_impl.cpp @@ -24,6 +24,7 @@ #include "../include/subscription.hpp" #include "../../configuration/include/internal.hpp" #include "../../endpoints/include/endpoint.hpp" +#include "../../endpoints/include/endpoint_definition.hpp" #include "../../endpoints/include/tcp_server_endpoint_impl.hpp" #include "../../endpoints/include/udp_server_endpoint_impl.hpp" #include "../../message/include/serializer.hpp" @@ -171,6 +172,28 @@ void service_discovery_impl::unsubscribe(service_t _service, } } +session_t service_discovery_impl::get_session(const boost::asio::ip::address &_address) { + session_t its_session; + auto found_session = sessions_.find(_address); + if (found_session == sessions_.end()) { + its_session = sessions_[_address] = 1; + } else { + its_session = found_session->second; + } + return its_session; +} + +void service_discovery_impl::increment_session(const boost::asio::ip::address &_address) { + auto found_session = sessions_.find(_address); + if (found_session != sessions_.end()) { + found_session->second ++; + if (0 == found_session->second) { + found_session->second++; + // TODO: what about the reboot flag? + } + } +} + void service_discovery_impl::insert_option( std::shared_ptr<message_impl> &_message, std::shared_ptr<entry_impl> _entry, const boost::asio::ip::address &_address, uint16_t _port, bool _is_reliable) { @@ -219,18 +242,36 @@ void service_discovery_impl::insert_option( } } -void service_discovery_impl::insert_service_entries( - std::shared_ptr<message_impl> &_message, services_t &_services, - bool _is_offer) { +void service_discovery_impl::insert_find_entries( + std::shared_ptr<message_impl> &_message, requests_t &_requests) { + for (auto its_service : _requests) { + for (auto its_instance : its_service.second) { + auto its_request = its_instance.second; + std::shared_ptr<serviceentry_impl> its_entry + = _message->create_service_entry(); + if (its_entry) { + its_entry->set_type(entry_type_e::FIND_SERVICE); + its_entry->set_service(its_service.first); + its_entry->set_instance(its_instance.first); + its_entry->set_major_version(its_request->get_major()); + its_entry->set_minor_version(its_request->get_minor()); + its_entry->set_ttl(its_request->get_ttl()); + } else { + VSOMEIP_ERROR << "Failed to create service entry!"; + } + } + } +} + +void service_discovery_impl::insert_offer_entries( + std::shared_ptr<message_impl> &_message, services_t &_services) { for (auto its_service : _services) { for (auto its_instance : its_service.second) { auto its_info = its_instance.second; std::shared_ptr<serviceentry_impl> its_entry = _message ->create_service_entry(); if (its_entry) { - its_entry->set_type( - (_is_offer ? - entry_type_e::OFFER_SERVICE : entry_type_e::FIND_SERVICE)); + its_entry->set_type(entry_type_e::OFFER_SERVICE); its_entry->set_service(its_service.first); its_entry->set_instance(its_instance.first); its_entry->set_major_version(its_info->get_major()); @@ -311,16 +352,20 @@ void service_discovery_impl::send(const std::string &_name, // If we are the default group and not in main phase, include "FindOffer"-entries if (_name == "default" && !_is_announcing) { - //insert_service_entries(its_message, requested_, false); + insert_find_entries(its_message, requested_); } // Always include the "OfferService"-entries for the service group services_t its_offers = host_->get_offered_services(_name); - insert_service_entries(its_message, its_offers, true); + insert_offer_entries(its_message, its_offers); // Serialize and send - if (its_message->get_entries().size() > 0) - host_->send(VSOMEIP_SD_CLIENT, its_message, true, false); + if (its_message->get_entries().size() > 0) { + its_message->set_session(get_session(unicast_)); + if (host_->send(VSOMEIP_SD_CLIENT, its_message, true, false)) { + increment_session(unicast_); + } + } } // Interface endpoint_host @@ -554,11 +599,16 @@ void service_discovery_impl::handle_service_availability( its_eventgroup.first, its_subscription, true); } } - serializer_->serialize(its_message.get()); - host_->send_subscribe(_address, port_, reliable_, - serializer_->get_data(), - serializer_->get_size()); - serializer_->reset(); + + if (0 < its_message->get_entries().size()) { + its_message->set_session(get_session(_address)); + serializer_->serialize(its_message.get()); + if (host_->send_to(_address, port_, reliable_, + serializer_->get_data(), serializer_->get_size())) { + increment_session(_address); + } + serializer_->reset(); + } } } } @@ -589,17 +639,35 @@ void service_discovery_impl::handle_eventgroup_subscription( // Could not find eventgroup --> send Nack if (!its_info || _major > its_info->get_major() || _ttl > its_info->get_ttl()) { + its_info = std::make_shared<eventgroupinfo>(_major, 0); } else { - host_->add_subscription(_service, _instance, _eventgroup, _address, - _reliable_port, _unreliable_port); + boost::asio::ip::address its_address; + uint16_t its_port; + bool its_reliable; + if (VSOMEIP_INVALID_PORT != _unreliable_port) { + if (!its_info->get_multicast(its_address, its_port)) { + its_address = _address; + its_port = _unreliable_port; + } + its_reliable = false; + } else { + its_address = _address; + its_port = _reliable_port; + its_reliable = true; + } + host_->on_subscribe(_service, _instance, _eventgroup, + std::make_shared<endpoint_definition>(its_address, its_port, its_reliable)); } insert_subscription_ack(its_message, _service, _instance, _eventgroup, its_info, false); + its_message->set_session(get_session(_address)); serializer_->serialize(its_message.get()); - host_->send_subscribe(_address, port_, reliable_, serializer_->get_data(), - serializer_->get_size()); + if (host_->send_to(_address, port_, reliable_, + serializer_->get_data(), serializer_->get_size())) { + increment_session(_address); + } serializer_->reset(); } } @@ -615,6 +683,9 @@ void service_discovery_impl::handle_eventgroup_subscription_ack( auto found_eventgroup = found_instance->second.find(_eventgroup); if (found_eventgroup != found_instance->second.end()) { found_eventgroup->second->set_acknowledged(true); + if (_address.is_multicast()) { + host_->on_subscribe_ack(_service, _instance, _address, _port); + } } } } diff --git a/interface/vsomeip/application.hpp b/interface/vsomeip/application.hpp index 38038be..a8bfb9f 100644 --- a/interface/vsomeip/application.hpp +++ b/interface/vsomeip/application.hpp @@ -40,12 +40,6 @@ public: virtual void stop_offer_service(service_t _service, instance_t _instance) = 0; - virtual void publish_eventgroup(service_t _service, instance_t _instance, - eventgroup_t _eventgroup, major_version_t _major, ttl_t _ttl) = 0; - - virtual void stop_publish_eventgroup(service_t _service, - instance_t _instance, eventgroup_t _eventgroup) = 0; - // Consume services virtual void request_service(service_t _service, instance_t _instance, major_version_t _major = ANY_MAJOR, minor_version_t _minor = |