diff options
Diffstat (limited to 'implementation/endpoints/src/udp_server_endpoint_impl.cpp')
-rw-r--r-- | implementation/endpoints/src/udp_server_endpoint_impl.cpp | 492 |
1 files changed, 360 insertions, 132 deletions
diff --git a/implementation/endpoints/src/udp_server_endpoint_impl.cpp b/implementation/endpoints/src/udp_server_endpoint_impl.cpp index 6ad7ce8..0a65b2e 100644 --- a/implementation/endpoints/src/udp_server_endpoint_impl.cpp +++ b/implementation/endpoints/src/udp_server_endpoint_impl.cpp @@ -9,70 +9,87 @@ #include <boost/asio/ip/multicast.hpp> #include <vsomeip/constants.hpp> +#include <vsomeip/internal/logger.hpp> #include "../include/endpoint_definition.hpp" #include "../include/endpoint_host.hpp" +#include "../include/tp.hpp" +#include "../../routing/include/routing_host.hpp" #include "../include/udp_server_endpoint_impl.hpp" #include "../../configuration/include/configuration.hpp" -#include "../../logging/include/logger.hpp" #include "../../utility/include/byteorder.hpp" #include "../../utility/include/utility.hpp" #include "../../service_discovery/include/defines.hpp" namespace ip = boost::asio::ip; -namespace vsomeip { +namespace vsomeip_v3 { udp_server_endpoint_impl::udp_server_endpoint_impl( - std::shared_ptr< endpoint_host > _host, - endpoint_type _local, + const std::shared_ptr<endpoint_host>& _endpoint_host, + const std::shared_ptr<routing_host>& _routing_host, + const endpoint_type& _local, boost::asio::io_service &_io, - configuration::endpoint_queue_limit_t _queue_limit, - std::uint32_t _udp_receive_buffer_size) - : server_endpoint_impl<ip::udp_ext>( - _host, _local, _io, VSOMEIP_MAX_UDP_MESSAGE_SIZE, _queue_limit), - socket_(_io, _local.protocol()), + const std::shared_ptr<configuration>& _configuration) : + server_endpoint_impl<ip::udp_ext>(_endpoint_host, _routing_host, _local, + _io, VSOMEIP_MAX_UDP_MESSAGE_SIZE, + _configuration->get_endpoint_queue_limit(_configuration->get_unicast_address().to_string(), _local.port()), + _configuration), + unicast_socket_(_io, _local.protocol()), + unicast_recv_buffer_(VSOMEIP_MAX_UDP_MESSAGE_SIZE, 0), joined_group_(false), - recv_buffer_(VSOMEIP_MAX_UDP_MESSAGE_SIZE, 0), - local_port_(_local.port()) { + local_port_(_local.port()), + tp_reassembler_(std::make_shared<tp::tp_reassembler>(_configuration->get_max_message_size_unreliable(), _io)), + tp_cleanup_timer_(_io) { + is_supporting_someip_tp_ = true; + boost::system::error_code ec; boost::asio::socket_base::reuse_address optionReuseAddress(true); - socket_.set_option(optionReuseAddress, ec); + unicast_socket_.set_option(optionReuseAddress, ec); boost::asio::detail::throw_error(ec, "reuse address"); - socket_.bind(_local, ec); +#ifndef _WIN32 + // If specified, bind to device + std::string its_device(configuration_->get_device()); + if (its_device != "") { + if (setsockopt(unicast_socket_.native_handle(), + SOL_SOCKET, SO_BINDTODEVICE, its_device.c_str(), (int)its_device.size()) == -1) { + VSOMEIP_WARNING << "UDP Server: Could not bind to device \"" << its_device << "\""; + } + } +#endif + + unicast_socket_.bind(_local, ec); boost::asio::detail::throw_error(ec, "bind"); - if (_local.address().is_v4()) { - boost::asio::ip::address_v4 its_unicast_address - = _host->get_configuration()->get_unicast_address().to_v4(); - boost::asio::ip::multicast::outbound_interface option(its_unicast_address); - socket_.set_option(option, ec); + if (local_.address().is_v4()) { + boost::asio::ip::multicast::outbound_interface option(_local.address().to_v4()); + unicast_socket_.set_option(option, ec); boost::asio::detail::throw_error(ec, "outbound interface option IPv4"); - } else if (_local.address().is_v6()) { - boost::asio::ip::address_v6 its_unicast_address - = _host->get_configuration()->get_unicast_address().to_v6(); + } else if (local_.address().is_v6()) { boost::asio::ip::multicast::outbound_interface option( - static_cast<unsigned int>(its_unicast_address.scope_id())); - socket_.set_option(option, ec); + static_cast<unsigned int>(local_.address().to_v6().scope_id())); + unicast_socket_.set_option(option, ec); boost::asio::detail::throw_error(ec, "outbound interface option IPv6"); } boost::asio::socket_base::broadcast option(true); - socket_.set_option(option, ec); + unicast_socket_.set_option(option, ec); boost::asio::detail::throw_error(ec, "broadcast option"); + const std::uint32_t its_udp_recv_buffer_size = + configuration_->get_udp_receive_buffer_size(); + unicast_socket_.set_option(boost::asio::socket_base::receive_buffer_size( + its_udp_recv_buffer_size), ec); - socket_.set_option(boost::asio::socket_base::receive_buffer_size( - _udp_receive_buffer_size), ec); if (ec) { VSOMEIP_WARNING << "udp_server_endpoint_impl:: couldn't set " << "SO_RCVBUF: " << ec.message() << " to: " << std::dec - << _udp_receive_buffer_size << " local port: " << std::dec + << its_udp_recv_buffer_size << " local port: " << std::dec << local_port_; } else { boost::asio::socket_base::receive_buffer_size its_option; - socket_.get_option(its_option, ec); + unicast_socket_.get_option(its_option, ec); if (ec) { VSOMEIP_WARNING << "udp_server_endpoint_impl: couldn't get " << "SO_RCVBUF: " << ec.message() << " local port:" @@ -83,14 +100,13 @@ udp_server_endpoint_impl::udp_server_endpoint_impl( } } - #ifdef _WIN32 const char* optval("0001"); - ::setsockopt(socket_.native(), IPPROTO_IP, IP_PKTINFO, + ::setsockopt(unicast_socket_.native_handle(), IPPROTO_IP, IP_PKTINFO, optval, sizeof(optval)); #else int optval(1); - ::setsockopt(socket_.native(), IPPROTO_IP, IP_PKTINFO, + ::setsockopt(unicast_socket_.native_handle(), IPPROTO_IP, IP_PKTINFO, &optval, sizeof(optval)); #endif } @@ -108,24 +124,63 @@ void udp_server_endpoint_impl::start() { void udp_server_endpoint_impl::stop() { server_endpoint_impl::stop(); + { - std::lock_guard<std::mutex> its_lock(socket_mutex_); - if (socket_.is_open()) { + std::lock_guard<std::mutex> its_lock(unicast_socket_mutex_); + + if (unicast_socket_.is_open()) { boost::system::error_code its_error; - socket_.shutdown(socket_type::shutdown_both, its_error); - socket_.close(its_error); + unicast_socket_.shutdown(socket_type::shutdown_both, its_error); + unicast_socket_.close(its_error); } } + + { + std::lock_guard<std::mutex> its_lock(multicast_socket_mutex_); + + if (multicast_socket_ && multicast_socket_->is_open()) { + boost::system::error_code its_error; + multicast_socket_->shutdown(socket_type::shutdown_both, its_error); + multicast_socket_->close(its_error); + } + } + + tp_reassembler_->stop(); } void udp_server_endpoint_impl::receive() { - std::lock_guard<std::mutex> its_lock(socket_mutex_); - if(socket_.is_open()) { - socket_.async_receive_from( - boost::asio::buffer(&recv_buffer_[0], max_message_size_), - remote_, + receive_unicast(); + receive_multicast(); +} + +void udp_server_endpoint_impl::receive_unicast() { + std::lock_guard<std::mutex> its_lock(unicast_socket_mutex_); + + if(unicast_socket_.is_open()) { + unicast_socket_.async_receive_from( + boost::asio::buffer(&unicast_recv_buffer_[0], max_message_size_), + unicast_remote_, std::bind( - &udp_server_endpoint_impl::receive_cbk, + &udp_server_endpoint_impl::on_unicast_received, + std::dynamic_pointer_cast< + udp_server_endpoint_impl >(shared_from_this()), + std::placeholders::_1, + std::placeholders::_2, + std::placeholders::_3 + ) + ); + } +} + +void udp_server_endpoint_impl::receive_multicast() { + std::lock_guard<std::mutex> its_lock(multicast_socket_mutex_); + + if (multicast_socket_ && multicast_socket_->is_open()) { + multicast_socket_->async_receive_from( + boost::asio::buffer(&(*multicast_recv_buffer_)[0], max_message_size_), + multicast_remote_, + std::bind( + &udp_server_endpoint_impl::on_multicast_received, std::dynamic_pointer_cast< udp_server_endpoint_impl >(shared_from_this()), std::placeholders::_1, @@ -138,14 +193,39 @@ void udp_server_endpoint_impl::receive() { bool udp_server_endpoint_impl::send_to( const std::shared_ptr<endpoint_definition> _target, - const byte_t *_data, uint32_t _size, bool _flush) { + const byte_t *_data, uint32_t _size) { std::lock_guard<std::mutex> its_lock(mutex_); endpoint_type its_target(_target->get_address(), _target->get_port()); - return send_intern(its_target, _data, _size, _flush); + return send_intern(its_target, _data, _size); +} + +bool udp_server_endpoint_impl::send_error( + const std::shared_ptr<endpoint_definition> _target, + const byte_t *_data, uint32_t _size) { + bool ret(false); + std::lock_guard<std::mutex> its_lock(mutex_); + const endpoint_type its_target(_target->get_address(), _target->get_port()); + const queue_iterator_type target_queue_iterator(find_or_create_queue_unlocked(its_target)); + auto& its_qpair = target_queue_iterator->second; + const bool queue_size_zero_on_entry(its_qpair.second.empty()); + + if (check_message_size(nullptr, _size, its_target) == endpoint_impl::cms_ret_e::MSG_OK && + check_queue_limit(_data, _size, its_qpair.first)) { + its_qpair.second.emplace_back( + std::make_shared<message_buffer_t>(_data, _data + _size)); + its_qpair.first += _size; + + if (queue_size_zero_on_entry) { // no writing in progress + send_queued(target_queue_iterator); + } + ret = true; + } + return ret; } void udp_server_endpoint_impl::send_queued( const queue_iterator_type _queue_iterator) { + message_buffer_ptr_t its_buffer = _queue_iterator->second.second.front(); #if 0 std::stringstream msg; @@ -156,9 +236,9 @@ void udp_server_endpoint_impl::send_queued( << (int)(*its_buffer)[i] << " "; VSOMEIP_INFO << msg.str(); #endif - { - std::lock_guard<std::mutex> its_lock(socket_mutex_); - socket_.async_send_to( + std::lock_guard<std::mutex> its_lock(unicast_socket_mutex_); + + unicast_socket_.async_send_to( boost::asio::buffer(*its_buffer), _queue_iterator->first, std::bind( @@ -169,7 +249,16 @@ void udp_server_endpoint_impl::send_queued( std::placeholders::_2 ) ); - } +} + +void udp_server_endpoint_impl::get_configured_times_from_endpoint( + service_t _service, method_t _method, + std::chrono::nanoseconds *_debouncing, + std::chrono::nanoseconds *_maximum_retention) const { + configuration_->get_configured_timing_responses(_service, + udp_server_endpoint_base_impl::local_.address().to_string(), + udp_server_endpoint_base_impl::local_.port(), _method, + _debouncing, _maximum_retention); } bool udp_server_endpoint_impl::is_joined(const std::string &_address) const { @@ -193,6 +282,70 @@ void udp_server_endpoint_impl::join(const std::string &_address) { auto join_func = [this](const std::string &_address) { try { + VSOMEIP_TRACE << "Joining to multicast group " << _address + << " from " << local_.address().to_string(); + + boost::system::error_code ec; + + if (!multicast_recv_buffer_) { + multicast_recv_buffer_ = std::unique_ptr<message_buffer_t>( + new message_buffer_t(VSOMEIP_MAX_UDP_MESSAGE_SIZE, 0)); + } + + if (!multicast_ep_) { + multicast_ep_ = std::unique_ptr<endpoint_type>( + new endpoint_type(boost::asio::ip::address_v4::any(), local_port_)); + } + + if (!multicast_socket_) { + multicast_socket_ = std::unique_ptr<socket_type>( + new socket_type(service_, local_.protocol())); + + boost::asio::socket_base::reuse_address optionReuseAddress(true); + multicast_socket_->set_option(optionReuseAddress, ec); + boost::asio::detail::throw_error(ec, "reuse address in multicast"); + + multicast_socket_->bind(*multicast_ep_, ec); + boost::asio::detail::throw_error(ec, "bind multicast"); + + const std::uint32_t its_udp_recv_buffer_size = + configuration_->get_udp_receive_buffer_size(); + + multicast_socket_->set_option(boost::asio::socket_base::receive_buffer_size( + its_udp_recv_buffer_size), ec); + + if (ec) { + VSOMEIP_WARNING << "udp_server_endpoint_impl:: couldn't set " + << "SO_RCVBUF: " << ec.message() << " to: " << std::dec + << its_udp_recv_buffer_size << " local port: " << std::dec + << local_port_; + } else { + boost::asio::socket_base::receive_buffer_size its_option; + multicast_socket_->get_option(its_option, ec); + + if (ec) { + VSOMEIP_WARNING << "udp_server_endpoint_impl: couldn't get " + << "SO_RCVBUF: " << ec.message() << " local port:" + << std::dec << local_port_; + } else { + VSOMEIP_INFO << "udp_server_endpoint_impl: SO_RCVBUF (Multicast) is: " + << std::dec << its_option.value(); + } + } + +#ifdef _WIN32 + const char* optval("0001"); + ::setsockopt(multicast_socket_->native_handle(), IPPROTO_IP, IP_PKTINFO, + optval, sizeof(optval)); +#else + int optval(1); + ::setsockopt(multicast_socket_->native_handle(), IPPROTO_IP, IP_PKTINFO, + &optval, sizeof(optval)); +#endif + + receive_multicast(); + } + bool is_v4(false); bool is_v6(false); { @@ -200,30 +353,31 @@ void udp_server_endpoint_impl::join(const std::string &_address) { is_v4 = local_.address().is_v4(); is_v6 = local_.address().is_v6(); } + if (is_v4) { - std::lock_guard<std::mutex> its_lock(socket_mutex_); - socket_.set_option(ip::udp_ext::socket::reuse_address(true)); - socket_.set_option( + std::lock_guard<std::mutex> its_lock(multicast_socket_mutex_); + multicast_socket_->set_option(ip::udp_ext::socket::reuse_address(true)); + multicast_socket_->set_option( boost::asio::ip::multicast::enable_loopback(false)); #ifdef _WIN32 - socket_.set_option(boost::asio::ip::multicast::join_group( + multicast_socket_->set_option(boost::asio::ip::multicast::join_group( boost::asio::ip::address::from_string(_address).to_v4(), local_.address().to_v4())); #else - socket_.set_option(boost::asio::ip::multicast::join_group( + multicast_socket_->set_option(boost::asio::ip::multicast::join_group( boost::asio::ip::address::from_string(_address).to_v4())); #endif } else if (is_v6) { - std::lock_guard<std::mutex> its_lock(socket_mutex_); - socket_.set_option(ip::udp_ext::socket::reuse_address(true)); - socket_.set_option( + std::lock_guard<std::mutex> its_lock(multicast_socket_mutex_); + multicast_socket_->set_option(ip::udp_ext::socket::reuse_address(true)); + multicast_socket_->set_option( boost::asio::ip::multicast::enable_loopback(false)); #ifdef _WIN32 - socket_.set_option(boost::asio::ip::multicast::join_group( + multicast_socket_->set_option(boost::asio::ip::multicast::join_group( boost::asio::ip::address::from_string(_address).to_v6(), local_.address().to_v6().scope_id())); #else - socket_.set_option(boost::asio::ip::multicast::join_group( + multicast_socket_->set_option(boost::asio::ip::multicast::join_group( boost::asio::ip::address::from_string(_address).to_v6())); #endif } @@ -249,6 +403,9 @@ void udp_server_endpoint_impl::join(const std::string &_address) { void udp_server_endpoint_impl::leave(const std::string &_address) { try { if (is_joined(_address)) { + VSOMEIP_TRACE << "Leaving the multicast group " << _address + << " from " << local_.address().to_string(); + bool is_v4(false); bool is_v6(false); { @@ -257,12 +414,12 @@ void udp_server_endpoint_impl::leave(const std::string &_address) { is_v6 = local_.address().is_v6(); } if (is_v4) { - std::lock_guard<std::mutex> its_lock(socket_mutex_); - socket_.set_option(boost::asio::ip::multicast::leave_group( + std::lock_guard<std::mutex> its_lock(multicast_socket_mutex_); + multicast_socket_->set_option(boost::asio::ip::multicast::leave_group( boost::asio::ip::address::from_string(_address))); } else if (is_v6) { - std::lock_guard<std::mutex> its_lock(socket_mutex_); - socket_.set_option(boost::asio::ip::multicast::leave_group( + std::lock_guard<std::mutex> its_lock(multicast_socket_mutex_); + multicast_socket_->set_option(boost::asio::ip::multicast::leave_group( boost::asio::ip::address::from_string(_address))); } { @@ -270,6 +427,10 @@ void udp_server_endpoint_impl::leave(const std::string &_address) { joined_.erase(_address); if (!joined_.size()) { joined_group_ = false; + + multicast_socket_.reset(nullptr); + multicast_ep_.reset(nullptr); + multicast_recv_buffer_.reset(nullptr); } } } @@ -308,10 +469,32 @@ std::uint16_t udp_server_endpoint_impl::get_local_port() const { return local_port_; } -// TODO: find a better way to structure the receive functions -void udp_server_endpoint_impl::receive_cbk( - boost::system::error_code const &_error, std::size_t _bytes, +void udp_server_endpoint_impl::on_unicast_received( + boost::system::error_code const &_error, + std::size_t _bytes, + boost::asio::ip::address const &_destination) { + on_message_received(_error, _bytes, _destination, unicast_remote_, unicast_recv_buffer_); + receive_unicast(); +} + +void udp_server_endpoint_impl::on_multicast_received( + boost::system::error_code const &_error, + std::size_t _bytes, boost::asio::ip::address const &_destination) { + + // Filter messages sent from the same source address + if (multicast_remote_.address() != local_.address()) { + on_message_received(_error, _bytes, _destination, multicast_remote_, *multicast_recv_buffer_); + } + + receive_multicast(); +} + +void udp_server_endpoint_impl::on_message_received( + boost::system::error_code const &_error, std::size_t _bytes, + boost::asio::ip::address const &_destination, + endpoint_type const &_remote, + message_buffer_t const &_buffer) { #if 0 std::stringstream msg; msg << "usei::rcb(" << _error.message() << "): "; @@ -320,16 +503,17 @@ void udp_server_endpoint_impl::receive_cbk( << (int) recv_buffer_[i] << " "; VSOMEIP_INFO << msg.str(); #endif - std::shared_ptr<endpoint_host> its_host = this->host_.lock(); + std::shared_ptr<routing_host> its_host = routing_host_.lock(); + if (its_host) { if (!_error && 0 < _bytes) { std::size_t remaining_bytes = _bytes; std::size_t i = 0; - const boost::asio::ip::address its_remote_address(remote_.address()); - const std::uint16_t its_remote_port(remote_.port()); + const boost::asio::ip::address its_remote_address(_remote.address()); + const std::uint16_t its_remote_port(_remote.port()); do { uint64_t read_message_size - = utility::get_message_size(&this->recv_buffer_[i], + = utility::get_message_size(&_buffer[i], remaining_bytes); if (read_message_size > MESSAGE_SIZE_UNLIMITED) { VSOMEIP_ERROR << "Message size exceeds allowed maximum!"; @@ -342,62 +526,58 @@ void udp_server_endpoint_impl::receive_cbk( VSOMEIP_ERROR << "buffer underflow in udp client endpoint ~> abort!"; return; } else if (current_message_size > VSOMEIP_RETURN_CODE_POS && - (recv_buffer_[i + VSOMEIP_PROTOCOL_VERSION_POS] != VSOMEIP_PROTOCOL_VERSION || - !utility::is_valid_message_type(static_cast<message_type_e>(recv_buffer_[i + VSOMEIP_MESSAGE_TYPE_POS])) || - !utility::is_valid_return_code(static_cast<return_code_e>(recv_buffer_[i + VSOMEIP_RETURN_CODE_POS])) + (_buffer[i + VSOMEIP_PROTOCOL_VERSION_POS] != VSOMEIP_PROTOCOL_VERSION || + !utility::is_valid_message_type(tp::tp::tp_flag_unset(_buffer[i + VSOMEIP_MESSAGE_TYPE_POS])) || + !utility::is_valid_return_code(static_cast<return_code_e>(_buffer[i + VSOMEIP_RETURN_CODE_POS])) )) { - if (recv_buffer_[i + VSOMEIP_PROTOCOL_VERSION_POS] != VSOMEIP_PROTOCOL_VERSION) { + if (_buffer[i + VSOMEIP_PROTOCOL_VERSION_POS] != VSOMEIP_PROTOCOL_VERSION) { VSOMEIP_ERROR << "use: Wrong protocol version: 0x" << std::hex << std::setw(2) << std::setfill('0') - << std::uint32_t(recv_buffer_[i + VSOMEIP_PROTOCOL_VERSION_POS]) + << std::uint32_t(_buffer[i + VSOMEIP_PROTOCOL_VERSION_POS]) << " local: " << get_address_port_local() << " remote: " << its_remote_address << ":" << std::dec << its_remote_port; // ensure to send back a message w/ wrong protocol version - its_host->on_message(&recv_buffer_[i], + its_host->on_message(&_buffer[i], VSOMEIP_SOMEIP_HEADER_SIZE + 8, this, _destination, VSOMEIP_ROUTING_CLIENT, - its_remote_address, - its_remote_port); - } else if (!utility::is_valid_message_type(static_cast<message_type_e>( - recv_buffer_[i + VSOMEIP_MESSAGE_TYPE_POS]))) { + std::make_pair(ANY_UID, ANY_GID), + its_remote_address, its_remote_port); + } else if (!utility::is_valid_message_type(tp::tp::tp_flag_unset( + _buffer[i + VSOMEIP_MESSAGE_TYPE_POS]))) { VSOMEIP_ERROR << "use: Invalid message type: 0x" << std::hex << std::setw(2) << std::setfill('0') - << std::uint32_t(recv_buffer_[i + VSOMEIP_MESSAGE_TYPE_POS]) + << std::uint32_t(_buffer[i + VSOMEIP_MESSAGE_TYPE_POS]) << " local: " << get_address_port_local() << " remote: " << its_remote_address << ":" << std::dec << its_remote_port; } else if (!utility::is_valid_return_code(static_cast<return_code_e>( - recv_buffer_[i + VSOMEIP_RETURN_CODE_POS]))) { + _buffer[i + VSOMEIP_RETURN_CODE_POS]))) { VSOMEIP_ERROR << "use: Invalid return code: 0x" << std::hex << std::setw(2) << std::setfill('0') - << std::uint32_t(recv_buffer_[i + VSOMEIP_RETURN_CODE_POS]) + << std::uint32_t(_buffer[i + VSOMEIP_RETURN_CODE_POS]) << " local: " << get_address_port_local() << " remote: " << its_remote_address << ":" << std::dec << its_remote_port; } - receive(); return; } remaining_bytes -= current_message_size; - service_t its_service = VSOMEIP_BYTES_TO_WORD(recv_buffer_[i + VSOMEIP_SERVICE_POS_MIN], - recv_buffer_[i + VSOMEIP_SERVICE_POS_MAX]); + const service_t its_service = VSOMEIP_BYTES_TO_WORD(_buffer[i + VSOMEIP_SERVICE_POS_MIN], + _buffer[i + VSOMEIP_SERVICE_POS_MAX]); if (utility::is_request( - recv_buffer_[i + VSOMEIP_MESSAGE_TYPE_POS])) { - client_t its_client; - std::memcpy(&its_client, - &recv_buffer_[i + VSOMEIP_CLIENT_POS_MIN], - sizeof(client_t)); - if (its_client != MAGIC_COOKIE_NETWORK_BYTE_ORDER) { - session_t its_session; - std::memcpy(&its_session, - &recv_buffer_[i + VSOMEIP_SESSION_POS_MIN], - sizeof(session_t)); + _buffer[i + VSOMEIP_MESSAGE_TYPE_POS])) { + const client_t its_client = VSOMEIP_BYTES_TO_WORD( + _buffer[i + VSOMEIP_CLIENT_POS_MIN], + _buffer[i + VSOMEIP_CLIENT_POS_MAX]); + if (its_client != MAGIC_COOKIE_CLIENT) { + const session_t its_session = VSOMEIP_BYTES_TO_WORD( + _buffer[i + VSOMEIP_SESSION_POS_MIN], + _buffer[i + VSOMEIP_SESSION_POS_MAX]); clients_mutex_.lock(); - clients_[its_client][its_session] = remote_; - endpoint_to_client_[remote_] = its_client; + clients_[its_client][its_session] = _remote; clients_mutex_.unlock(); } } else if (its_service != VSOMEIP_SD_SERVICE - && utility::is_notification(recv_buffer_[i + VSOMEIP_MESSAGE_TYPE_POS]) + && utility::is_notification(_buffer[i + VSOMEIP_MESSAGE_TYPE_POS]) && joined_group_) { std::lock_guard<std::mutex> its_lock(joined_mutex_); boost::system::error_code ec; @@ -406,61 +586,91 @@ void udp_server_endpoint_impl::receive_cbk( found_address->second = true; } } - if (its_service != VSOMEIP_SD_SERVICE || - (current_message_size > VSOMEIP_SOMEIP_HEADER_SIZE && - current_message_size >= remaining_bytes)) { - its_host->on_message(&recv_buffer_[i], - current_message_size, this, _destination, - VSOMEIP_ROUTING_CLIENT, its_remote_address, - its_remote_port); + if (tp::tp::tp_flag_is_set(_buffer[i + VSOMEIP_MESSAGE_TYPE_POS])) { + const auto res = tp_reassembler_->process_tp_message( + &_buffer[i], current_message_size, + its_remote_address, its_remote_port); + if (res.first) { + if (utility::is_request(res.second[VSOMEIP_MESSAGE_TYPE_POS])) { + const client_t its_client = VSOMEIP_BYTES_TO_WORD( + res.second[VSOMEIP_CLIENT_POS_MIN], + res.second[VSOMEIP_CLIENT_POS_MAX]); + if (its_client != MAGIC_COOKIE_CLIENT) { + const session_t its_session = VSOMEIP_BYTES_TO_WORD( + res.second[VSOMEIP_SESSION_POS_MIN], + res.second[VSOMEIP_SESSION_POS_MAX]); + clients_mutex_.lock(); + clients_[its_client][its_session] = _remote; + clients_mutex_.unlock(); + } + } else if (its_service != VSOMEIP_SD_SERVICE + && utility::is_notification(res.second[VSOMEIP_MESSAGE_TYPE_POS]) + && joined_group_) { + std::lock_guard<std::mutex> its_lock(joined_mutex_); + boost::system::error_code ec; + const auto found_address = joined_.find(_destination.to_string(ec)); + if (found_address != joined_.end()) { + found_address->second = true; + } + } + its_host->on_message(&res.second[0], + static_cast<std::uint32_t>(res.second.size()), + this, _destination, VSOMEIP_ROUTING_CLIENT, + std::make_pair(ANY_UID, ANY_GID), + its_remote_address, its_remote_port); + } } else { - //ignore messages for service discovery with shorter SomeIP length - VSOMEIP_ERROR << "Received an unreliable vSomeIP SD message with too short length field"; + if (its_service != VSOMEIP_SD_SERVICE || + (current_message_size > VSOMEIP_SOMEIP_HEADER_SIZE && + current_message_size >= remaining_bytes)) { + its_host->on_message(&_buffer[i], + current_message_size, this, _destination, + VSOMEIP_ROUTING_CLIENT, + std::make_pair(ANY_UID, ANY_GID), + its_remote_address, its_remote_port); + } else { + //ignore messages for service discovery with shorter SomeIP length + VSOMEIP_ERROR << "Received an unreliable vSomeIP SD message with too short length field" + << " local: " << get_address_port_local() + << " remote: " << its_remote_address << ":" << std::dec << its_remote_port; + } } i += current_message_size; } else { - VSOMEIP_ERROR << "Received an unreliable vSomeIP message with bad length field"; + VSOMEIP_ERROR << "Received an unreliable vSomeIP message with bad length field" + << " local: " << get_address_port_local() + << " remote: " << its_remote_address << ":" << std::dec << its_remote_port; if (remaining_bytes > VSOMEIP_SERVICE_POS_MAX) { - service_t its_service = VSOMEIP_BYTES_TO_WORD(recv_buffer_[VSOMEIP_SERVICE_POS_MIN], - recv_buffer_[VSOMEIP_SERVICE_POS_MAX]); + service_t its_service = VSOMEIP_BYTES_TO_WORD(_buffer[VSOMEIP_SERVICE_POS_MIN], + _buffer[VSOMEIP_SERVICE_POS_MAX]); if (its_service != VSOMEIP_SD_SERVICE) { if (read_message_size == 0) { VSOMEIP_ERROR << "Ignoring unreliable vSomeIP message with SomeIP message length 0!"; } else { - its_host->on_error(&recv_buffer_[i], - (uint32_t)remaining_bytes, this, - its_remote_address, its_remote_port); + auto its_endpoint_host = endpoint_host_.lock(); + if (its_endpoint_host) { + its_endpoint_host->on_error(&_buffer[i], + (uint32_t)remaining_bytes, this, + its_remote_address, its_remote_port); + } } } } remaining_bytes = 0; } } while (remaining_bytes > 0); - receive(); - } else { - receive(); } } } -client_t udp_server_endpoint_impl::get_client(std::shared_ptr<endpoint_definition> _endpoint) { - const endpoint_type endpoint(_endpoint->get_address(), _endpoint->get_port()); - std::lock_guard<std::mutex> its_lock(clients_mutex_); - auto found_endpoint = endpoint_to_client_.find(endpoint); - if (found_endpoint != endpoint_to_client_.end()) { - // TODO: Check system byte order before convert! - const client_t client = client_t(found_endpoint->second << 8 | found_endpoint->second >> 8); - return client; - } - return 0; -} - void udp_server_endpoint_impl::print_status() { std::lock_guard<std::mutex> its_lock(mutex_); VSOMEIP_INFO << "status use: " << std::dec << local_port_ << " number queues: " << std::dec << queues_.size() - << " recv_buffer: " << std::dec << recv_buffer_.capacity(); + << " recv_buffer: " << std::dec << unicast_recv_buffer_.capacity() + << " multicast_recv_buffer: " << std::dec + << (multicast_recv_buffer_ ? multicast_recv_buffer_->capacity() : 0); for (const auto &c : queues_) { std::size_t its_data_size(0); @@ -484,13 +694,24 @@ std::string udp_server_endpoint_impl::get_remote_information( + std::to_string(_queue_iterator->first.port()); } +std::string udp_server_endpoint_impl::get_remote_information( + const endpoint_type& _remote) const { + boost::system::error_code ec; + return _remote.address().to_string(ec) + ":" + + std::to_string(_remote.port()); +} + +bool udp_server_endpoint_impl::is_reliable() const { + return false; +} + const std::string udp_server_endpoint_impl::get_address_port_local() const { - std::lock_guard<std::mutex> its_lock(socket_mutex_); + std::lock_guard<std::mutex> its_lock(unicast_socket_mutex_); std::string its_address_port; its_address_port.reserve(21); boost::system::error_code ec; - if (socket_.is_open()) { - endpoint_type its_local_endpoint = socket_.local_endpoint(ec); + if (unicast_socket_.is_open()) { + endpoint_type its_local_endpoint = unicast_socket_.local_endpoint(ec); if (!ec) { its_address_port += its_local_endpoint.address().to_string(ec); its_address_port += ":"; @@ -500,4 +721,11 @@ const std::string udp_server_endpoint_impl::get_address_port_local() const { return its_address_port; } -} // namespace vsomeip +bool udp_server_endpoint_impl::tp_segmentation_enabled(service_t _service, + method_t _method) const { + return configuration_->tp_segment_messages_service_to_client(_service, + local_.address().to_string(), + local_.port(), _method); +} + +} // namespace vsomeip_v3 |