diff options
Diffstat (limited to 'implementation/endpoints/src/udp_server_endpoint_impl.cpp')
-rw-r--r-- | implementation/endpoints/src/udp_server_endpoint_impl.cpp | 617 |
1 files changed, 368 insertions, 249 deletions
diff --git a/implementation/endpoints/src/udp_server_endpoint_impl.cpp b/implementation/endpoints/src/udp_server_endpoint_impl.cpp index bd44b48..79dee1c 100644 --- a/implementation/endpoints/src/udp_server_endpoint_impl.cpp +++ b/implementation/endpoints/src/udp_server_endpoint_impl.cpp @@ -1,12 +1,17 @@ -// Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) +// Copyright (C) 2014-2021 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) // This Source Code Form is subject to the terms of the Mozilla Public // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. #include <iomanip> #include <sstream> +#include <thread> #include <boost/asio/ip/multicast.hpp> +#if VSOMEIP_BOOST_VERSION >= 106600 +#include <boost/asio/ip/network_v4.hpp> +#include <boost/asio/ip/network_v6.hpp> +#endif #include <vsomeip/constants.hpp> #include <vsomeip/internal/logger.hpp> @@ -14,8 +19,9 @@ #include "../include/endpoint_definition.hpp" #include "../include/endpoint_host.hpp" #include "../include/tp.hpp" -#include "../../routing/include/routing_host.hpp" #include "../include/udp_server_endpoint_impl.hpp" +#include "../include/udp_server_endpoint_impl_receive_op.hpp" +#include "../../routing/include/routing_host.hpp" #include "../../configuration/include/configuration.hpp" #include "../../utility/include/byteorder.hpp" #include "../../utility/include/utility.hpp" @@ -29,16 +35,24 @@ udp_server_endpoint_impl::udp_server_endpoint_impl( const std::shared_ptr<endpoint_host>& _endpoint_host, const std::shared_ptr<routing_host>& _routing_host, const endpoint_type& _local, - boost::asio::io_service &_io, + boost::asio::io_context &_io, const std::shared_ptr<configuration>& _configuration) : - server_endpoint_impl<ip::udp_ext>(_endpoint_host, _routing_host, _local, - _io, VSOMEIP_MAX_UDP_MESSAGE_SIZE, - _configuration->get_endpoint_queue_limit(_configuration->get_unicast_address().to_string(), _local.port()), - _configuration), +#if VSOMEIP_BOOST_VERSION >= 106600 + server_endpoint_impl<ip::udp>( +#else + server_endpoint_impl<ip::udp_ext>( +#endif + _endpoint_host, _routing_host, _local, + _io, VSOMEIP_MAX_UDP_MESSAGE_SIZE, + _configuration->get_endpoint_queue_limit(_configuration->get_unicast_address().to_string(), _local.port()), + _configuration), unicast_socket_(_io, _local.protocol()), unicast_recv_buffer_(VSOMEIP_MAX_UDP_MESSAGE_SIZE, 0), + is_v4_(false), multicast_id_(0), joined_group_(false), + netmask_(_configuration->get_netmask()), + prefix_(_configuration->get_prefix()), local_port_(_local.port()), tp_reassembler_(std::make_shared<tp::tp_reassembler>(_configuration->get_max_message_size_unreliable(), _io)), tp_cleanup_timer_(_io) { @@ -48,9 +62,11 @@ udp_server_endpoint_impl::udp_server_endpoint_impl( boost::asio::socket_base::reuse_address optionReuseAddress(true); unicast_socket_.set_option(optionReuseAddress, ec); - boost::asio::detail::throw_error(ec, "reuse address"); + if (ec) + VSOMEIP_ERROR << __func__ + << ": set reuse address option failed (" << ec.message() << ")"; -#ifndef _WIN32 +#if defined(__linux__) || defined(ANDROID) // If specified, bind to device std::string its_device(configuration_->get_device()); if (its_device != "") { @@ -62,27 +78,37 @@ udp_server_endpoint_impl::udp_server_endpoint_impl( #endif unicast_socket_.bind(_local, ec); - boost::asio::detail::throw_error(ec, "bind"); + if (ec) + VSOMEIP_ERROR << __func__ + << ": bind failed (" << ec.message() << ")"; if (local_.address().is_v4()) { + is_v4_ = true; boost::asio::ip::multicast::outbound_interface option(_local.address().to_v4()); unicast_socket_.set_option(option, ec); - boost::asio::detail::throw_error(ec, "outbound interface option IPv4"); - } else if (local_.address().is_v6()) { + if (ec) + VSOMEIP_ERROR << __func__ + << ": set IPv4 outbound interface option failed (" << ec.message() << ")"; + } else { boost::asio::ip::multicast::outbound_interface option( static_cast<unsigned int>(local_.address().to_v6().scope_id())); unicast_socket_.set_option(option, ec); - boost::asio::detail::throw_error(ec, "outbound interface option IPv6"); + if (ec) + VSOMEIP_ERROR << __func__ + << ": set IPv6 outbound interface option failed (" << ec.message() << ")"; } boost::asio::socket_base::broadcast option(true); unicast_socket_.set_option(option, ec); - boost::asio::detail::throw_error(ec, "broadcast option"); + if (ec) + VSOMEIP_ERROR << __func__ + << ": set broadcast option failed (" << ec.message() << ")"; const int its_udp_recv_buffer_size = configuration_->get_udp_receive_buffer_size(); unicast_socket_.set_option(boost::asio::socket_base::receive_buffer_size( its_udp_recv_buffer_size), ec); + if (ec) { VSOMEIP_WARNING << "udp_server_endpoint_impl: couldn't set " << "SO_RCVBUF: " << ec.message() << " to: " << std::dec @@ -118,17 +144,6 @@ udp_server_endpoint_impl::udp_server_endpoint_impl( << " (" << its_udp_recv_buffer_size << ") local port:" << std::dec << local_port_; } - - -#ifdef _WIN32 - const char* optval("0001"); - ::setsockopt(unicast_socket_.native_handle(), IPPROTO_IP, IP_PKTINFO, - optval, sizeof(optval)); -#else - int optval(1); - ::setsockopt(unicast_socket_.native_handle(), IPPROTO_IP, IP_PKTINFO, - &optval, sizeof(optval)); -#endif } udp_server_endpoint_impl::~udp_server_endpoint_impl() { @@ -175,7 +190,7 @@ void udp_server_endpoint_impl::receive_unicast() { std::lock_guard<std::mutex> its_lock(unicast_mutex_); - if(unicast_socket_.is_open()) { + if (unicast_socket_.is_open()) { unicast_socket_.async_receive_from( boost::asio::buffer(&unicast_recv_buffer_[0], max_message_size_), unicast_remote_, @@ -184,8 +199,7 @@ void udp_server_endpoint_impl::receive_unicast() { std::dynamic_pointer_cast< udp_server_endpoint_impl >(shared_from_this()), std::placeholders::_1, - std::placeholders::_2, - std::placeholders::_3 + std::placeholders::_2 ) ); } @@ -197,8 +211,9 @@ void udp_server_endpoint_impl::receive_unicast() { void udp_server_endpoint_impl::receive_multicast(uint8_t _multicast_id) { if (_multicast_id == multicast_id_ && multicast_socket_ && multicast_socket_->is_open()) { - multicast_socket_->async_receive_from( - boost::asio::buffer(&multicast_recv_buffer_[0], max_message_size_), +#if VSOMEIP_BOOST_VERSION >= 106600 + udp_server_endpoint_impl_receive_op its_operation { + *multicast_socket_, multicast_remote_, std::bind( &udp_server_endpoint_impl::on_multicast_received, @@ -207,9 +222,31 @@ void udp_server_endpoint_impl::receive_multicast(uint8_t _multicast_id) { std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, - _multicast_id + std::placeholders::_4 + ), + &multicast_recv_buffer_[0], + max_message_size_, + _multicast_id, + is_v4_, + boost::asio::ip::address(), + std::numeric_limits<size_t>::min() + }; + multicast_socket_->async_wait(socket_type::wait_read, its_operation); +#else + multicast_socket_->async_receive_from( + boost::asio::buffer(&multicast_recv_buffer_[0], max_message_size_), + multicast_remote_, + std::bind( + &udp_server_endpoint_impl::on_multicast_received, + std::dynamic_pointer_cast< + udp_server_endpoint_impl >(shared_from_this()), + std::placeholders::_1, + std::placeholders::_2, + _multicast_id, + std::placeholders::_3 ) ); +#endif } } @@ -229,50 +266,68 @@ bool udp_server_endpoint_impl::send_error( bool ret(false); std::lock_guard<std::mutex> its_lock(mutex_); const endpoint_type its_target(_target->get_address(), _target->get_port()); - const queue_iterator_type target_queue_iterator(find_or_create_queue_unlocked(its_target)); - auto& its_qpair = target_queue_iterator->second; - const bool queue_size_zero_on_entry(its_qpair.second.empty()); + const auto its_target_iterator(find_or_create_target_unlocked(its_target)); + auto& its_data = its_target_iterator->second; + const bool queue_size_zero_on_entry(its_data.queue_.empty()); if (check_message_size(nullptr, _size, its_target) == endpoint_impl::cms_ret_e::MSG_OK && - check_queue_limit(_data, _size, its_qpair.first)) { - its_qpair.second.emplace_back( - std::make_shared<message_buffer_t>(_data, _data + _size)); - its_qpair.first += _size; + check_queue_limit(_data, _size, its_data.queue_size_)) { + its_data.queue_.emplace_back( + std::make_pair(std::make_shared<message_buffer_t>(_data, _data + _size), 0)); + its_data.queue_size_ += _size; if (queue_size_zero_on_entry) { // no writing in progress - send_queued(target_queue_iterator); + (void)send_queued(its_target_iterator); } ret = true; } return ret; } -void udp_server_endpoint_impl::send_queued( - const queue_iterator_type _queue_iterator) { +bool udp_server_endpoint_impl::send_queued( + const target_data_iterator_type _it) { - message_buffer_ptr_t its_buffer = _queue_iterator->second.second.front(); + static std::chrono::steady_clock::time_point its_last_sent; + const auto its_entry = _it->second.queue_.front(); #if 0 - std::stringstream msg; - msg << "usei::sq(" << _queue_iterator->first.address().to_string() << ":" - << _queue_iterator->first.port() << "): "; - for (std::size_t i = 0; i < its_buffer->size(); ++i) - msg << std::hex << std::setw(2) << std::setfill('0') - << (int)(*its_buffer)[i] << " "; - VSOMEIP_INFO << msg.str(); + std::stringstream msg; + msg << "usei::sq(" << _queue_iterator->first.address().to_string() << ":" + << _queue_iterator->first.port() << "): "; + for (std::size_t i = 0; i < its_buffer->size(); ++i) + msg << std::hex << std::setw(2) << std::setfill('0') + << (int)(*its_entry.first)[i] << " "; + VSOMEIP_INFO << msg.str(); #endif std::lock_guard<std::mutex> its_lock(unicast_mutex_); + // Check whether we need to wait (SOME/IP-TP separation time) + if (its_entry.second > 0) { + if (its_last_sent != std::chrono::steady_clock::time_point()) { + const auto its_elapsed + = std::chrono::duration_cast<std::chrono::microseconds>( + std::chrono::steady_clock::now() - its_last_sent).count(); + if (its_entry.second > its_elapsed) + std::this_thread::sleep_for( + std::chrono::microseconds(its_entry.second - its_elapsed)); + } + its_last_sent = std::chrono::steady_clock::now(); + } else { + its_last_sent = std::chrono::steady_clock::time_point(); + } + unicast_socket_.async_send_to( - boost::asio::buffer(*its_buffer), - _queue_iterator->first, + boost::asio::buffer(*its_entry.first), + _it->first, std::bind( &udp_server_endpoint_base_impl::send_cbk, shared_from_this(), - _queue_iterator, + _it, std::placeholders::_1, std::placeholders::_2 ) ); + + return false; } void udp_server_endpoint_impl::get_configured_times_from_endpoint( @@ -325,126 +380,18 @@ void udp_server_endpoint_impl::join_unlocked(const std::string &_address) { VSOMEIP_DEBUG << "Joining to multicast group " << _address << " from " << local_.address().to_string(); - boost::system::error_code ec; - - bool is_v4(false); - bool is_v6(false); - { - std::lock_guard<std::mutex> its_lock(local_mutex_); - is_v4 = local_.address().is_v4(); - is_v6 = local_.address().is_v6(); - } - - if (multicast_recv_buffer_.empty()) - multicast_recv_buffer_.resize(VSOMEIP_MAX_UDP_MESSAGE_SIZE, 0); - - if (!multicast_local_) { - if (is_v4) { - multicast_local_ = std::unique_ptr<endpoint_type>( - new endpoint_type(boost::asio::ip::address_v4::any(), local_port_)); - } - if (is_v6) { - multicast_local_ = std::unique_ptr<endpoint_type>( - new endpoint_type(boost::asio::ip::address_v6::any(), local_port_)); - } - } - - if (!multicast_socket_) { - multicast_socket_ = std::unique_ptr<socket_type>( - new socket_type(service_, local_.protocol())); - - boost::asio::socket_base::reuse_address optionReuseAddress(true); - multicast_socket_->set_option(optionReuseAddress, ec); - boost::asio::detail::throw_error(ec, "reuse address in multicast"); - boost::asio::socket_base::broadcast optionBroadcast(true); - multicast_socket_->set_option(optionBroadcast, ec); - boost::asio::detail::throw_error(ec, "set broadcast option"); - - multicast_socket_->bind(*multicast_local_, ec); - boost::asio::detail::throw_error(ec, "bind multicast"); - - const int its_udp_recv_buffer_size = - configuration_->get_udp_receive_buffer_size(); - multicast_socket_->set_option(boost::asio::socket_base::receive_buffer_size( - its_udp_recv_buffer_size), ec); - if (ec) { - VSOMEIP_WARNING << "udp_server_endpoint_impl<multicast>: couldn't set " - << "SO_RCVBUF: " << ec.message() << " to: " << std::dec - << its_udp_recv_buffer_size << " local port: " << std::dec - << local_port_; - } - - boost::asio::socket_base::receive_buffer_size its_option; - multicast_socket_->get_option(its_option, ec); - #ifdef __linux__ - // If regular setting of the buffer size did not work, try to force - // (requires CAP_NET_ADMIN to be successful) - if (its_option.value() < 0 - || its_option.value() < its_udp_recv_buffer_size) { - ec.assign(setsockopt(multicast_socket_->native_handle(), - SOL_SOCKET, SO_RCVBUFFORCE, - &its_udp_recv_buffer_size, sizeof(its_udp_recv_buffer_size)), - boost::system::generic_category()); - if (!ec) { - VSOMEIP_INFO << "udp_server_endpoint_impl<multicast>: " - << "SO_RCVBUFFORCE: successful."; - } - multicast_socket_->get_option(its_option, ec); - } - #endif - if (ec) { - VSOMEIP_WARNING << "udp_server_endpoint_impl<multicast>: couldn't get " - << "SO_RCVBUF: " << ec.message() << " local port:" - << std::dec << local_port_; - } else { - VSOMEIP_INFO << "udp_server_endpoint_impl<multicast>: SO_RCVBUF is: " - << std::dec << its_option.value() - << " (" << its_udp_recv_buffer_size << ") local port:" - << std::dec << local_port_; - } - -#ifdef _WIN32 - const char* optval("0001"); - if (is_v4) { - ::setsockopt(multicast_socket_->native_handle(), IPPROTO_IP, IP_PKTINFO, - optval, sizeof(optval)); - } else if (is_v6) { - ::setsockopt(multicast_socket_->native_handle(), IPPROTO_IPV6, IPV6_PKTINFO, - optval, sizeof(optval)); - } + auto its_endpoint_host = endpoint_host_.lock(); + if (its_endpoint_host) { + multicast_option_t its_join_option { shared_from_this(), true, +#if VSOMEIP_BOOST_VERSION < 106600 + boost::asio::ip::address::from_string(_address) }; #else - int optval(1); - if (is_v4) { - ::setsockopt(multicast_socket_->native_handle(), IPPROTO_IP, IP_PKTINFO, - &optval, sizeof(optval)); - } else { - ::setsockopt(multicast_socket_->native_handle(), IPPROTO_IPV6, IPV6_RECVPKTINFO, - &optval, sizeof(optval)); - } + boost::asio::ip::make_address(_address) }; #endif - multicast_id_++; - receive_multicast(multicast_id_); - } - - if (is_v4) { - multicast_socket_->set_option(ip::udp_ext::socket::reuse_address(true)); - multicast_socket_->set_option( - boost::asio::ip::multicast::enable_loopback(false)); - multicast_socket_->set_option(boost::asio::ip::multicast::join_group( - boost::asio::ip::address::from_string(_address).to_v4(), - local_.address().to_v4())); - } else if (is_v6) { - multicast_socket_->set_option(ip::udp_ext::socket::reuse_address(true)); - multicast_socket_->set_option( - boost::asio::ip::multicast::enable_loopback(false)); - multicast_socket_->set_option(boost::asio::ip::multicast::join_group( - boost::asio::ip::address::from_string(_address).to_v6(), - local_.address().to_v6().scope_id())); + its_endpoint_host->add_multicast_option(its_join_option); } joined_[_address] = false; - joined_group_ = true; - } catch (const std::exception &e) { VSOMEIP_ERROR << "udp_server_endpoint_impl::join" << ":" << e.what() << " address: " << _address; @@ -473,31 +420,20 @@ void udp_server_endpoint_impl::leave_unlocked(const std::string &_address) { VSOMEIP_DEBUG << "Leaving the multicast group " << _address << " from " << local_.address().to_string(); - bool is_v4(false); - bool is_v6(false); - { - std::lock_guard<std::mutex> its_lock(local_mutex_); - is_v4 = local_.address().is_v4(); - is_v6 = local_.address().is_v6(); - } - if (is_v4) { - multicast_socket_->set_option(boost::asio::ip::multicast::leave_group( - boost::asio::ip::address::from_string(_address))); - } else if (is_v6) { - multicast_socket_->set_option(boost::asio::ip::multicast::leave_group( - boost::asio::ip::address::from_string(_address))); + if (multicast_socket_) { + auto its_endpoint_host = endpoint_host_.lock(); + if (its_endpoint_host) { + multicast_option_t its_leave_option { shared_from_this(), +#if VSOMEIP_BOOST_VERSION < 106600 + false, boost::asio::ip::address::from_string(_address) }; +#else + false, boost::asio::ip::make_address(_address) }; +#endif + its_endpoint_host->add_multicast_option(its_leave_option); + } } joined_.erase(_address); - if (0 == joined_.size()) { - joined_group_ = false; - - boost::system::error_code ec; - multicast_socket_->cancel(ec); - - multicast_socket_.reset(nullptr); - multicast_local_.reset(nullptr); - } } } catch (const std::exception &e) { @@ -541,8 +477,7 @@ void udp_server_endpoint_impl::set_local_port(std::uint16_t _port) { void udp_server_endpoint_impl::on_unicast_received( boost::system::error_code const &_error, - std::size_t _bytes, - boost::asio::ip::address const &_destination) { + std::size_t _bytes) { if (_error != boost::asio::error::operation_aborted) { { @@ -550,7 +485,7 @@ void udp_server_endpoint_impl::on_unicast_received( // & multicast messages are not processed in parallel. This aligns // the behavior of endpoints with one and two active sockets. std::lock_guard<std::mutex> its_lock(multicast_mutex_); - on_message_received(_error, _bytes, _destination, + on_message_received(_error, _bytes, false, unicast_remote_, unicast_recv_buffer_); } receive_unicast(); @@ -560,14 +495,20 @@ void udp_server_endpoint_impl::on_unicast_received( void udp_server_endpoint_impl::on_multicast_received( boost::system::error_code const &_error, std::size_t _bytes, - boost::asio::ip::address const &_destination, - uint8_t _multicast_id) { + uint8_t _multicast_id, + const boost::asio::ip::address &_destination) { std::lock_guard<std::mutex> its_lock(multicast_mutex_); if (_error != boost::asio::error::operation_aborted) { // Filter messages sent from the same source address - if (multicast_remote_.address() != local_.address()) { - on_message_received(_error, _bytes, _destination, + if (multicast_remote_.address() != local_.address() + && is_same_subnet(multicast_remote_.address())) { + + auto find_joined = joined_.find(_destination.to_string()); + if (find_joined != joined_.end()) + find_joined->second = true; + + on_message_received(_error, _bytes, true, multicast_remote_, multicast_recv_buffer_); } @@ -577,7 +518,7 @@ void udp_server_endpoint_impl::on_multicast_received( void udp_server_endpoint_impl::on_message_received( boost::system::error_code const &_error, std::size_t _bytes, - boost::asio::ip::address const &_destination, + bool _is_multicast, endpoint_type const &_remote, message_buffer_t const &_buffer) { #if 0 @@ -585,7 +526,7 @@ void udp_server_endpoint_impl::on_message_received( msg << "usei::rcb(" << _error.message() << "): "; for (std::size_t i = 0; i < _bytes; ++i) msg << std::hex << std::setw(2) << std::setfill('0') - << (int) recv_buffer_[i] << " "; + << (int) _buffer[i] << " "; VSOMEIP_INFO << msg.str(); #endif std::shared_ptr<routing_host> its_host = routing_host_.lock(); @@ -613,7 +554,7 @@ void udp_server_endpoint_impl::on_message_received( } else if (current_message_size > VSOMEIP_RETURN_CODE_POS && (_buffer[i + VSOMEIP_PROTOCOL_VERSION_POS] != VSOMEIP_PROTOCOL_VERSION || !utility::is_valid_message_type(tp::tp::tp_flag_unset(_buffer[i + VSOMEIP_MESSAGE_TYPE_POS])) || - /*!utility::is_valid_return_code(static_cast<return_code_e>(_buffer[i + VSOMEIP_RETURN_CODE_POS])) ||*/ + !utility::is_valid_return_code(static_cast<return_code_e>(_buffer[i + VSOMEIP_RETURN_CODE_POS])) || (tp::tp::tp_flag_is_set(_buffer[i + VSOMEIP_MESSAGE_TYPE_POS]) && get_local_port() == configuration_->get_sd_port()) )) { if (_buffer[i + VSOMEIP_PROTOCOL_VERSION_POS] != VSOMEIP_PROTOCOL_VERSION) { @@ -625,9 +566,9 @@ void udp_server_endpoint_impl::on_message_received( // ensure to send back a message w/ wrong protocol version its_host->on_message(&_buffer[i], VSOMEIP_SOMEIP_HEADER_SIZE + 8, this, - _destination, + _is_multicast, VSOMEIP_ROUTING_CLIENT, - std::make_pair(ANY_UID, ANY_GID), + nullptr, its_remote_address, its_remote_port); } else if (!utility::is_valid_message_type(tp::tp::tp_flag_unset( _buffer[i + VSOMEIP_MESSAGE_TYPE_POS]))) { @@ -663,21 +604,9 @@ void udp_server_endpoint_impl::on_message_received( const session_t its_session = VSOMEIP_BYTES_TO_WORD( _buffer[i + VSOMEIP_SESSION_POS_MIN], _buffer[i + VSOMEIP_SESSION_POS_MAX]); - const method_t its_method = VSOMEIP_BYTES_TO_WORD( - _buffer[i + VSOMEIP_METHOD_POS_MIN], - _buffer[i + VSOMEIP_METHOD_POS_MAX]); - - std::lock_guard<std::mutex> its_requests_guard(requests_mutex_); - requests_[its_client] - [std::make_tuple(its_service, its_method, its_session)] = _remote; - } - } else if (its_service != VSOMEIP_SD_SERVICE - && utility::is_notification(_buffer[i + VSOMEIP_MESSAGE_TYPE_POS]) - && joined_group_) { - boost::system::error_code ec; - const auto found_address = joined_.find(_destination.to_string(ec)); - if (found_address != joined_.end()) { - found_address->second = true; + clients_mutex_.lock(); + clients_[its_client][its_session] = _remote; + clients_mutex_.unlock(); } } if (tp::tp::tp_flag_is_set(_buffer[i + VSOMEIP_MESSAGE_TYPE_POS])) { @@ -699,33 +628,17 @@ void udp_server_endpoint_impl::on_message_received( res.second[VSOMEIP_CLIENT_POS_MIN], res.second[VSOMEIP_CLIENT_POS_MAX]); if (its_client != MAGIC_COOKIE_CLIENT) { - const service_t its_service = VSOMEIP_BYTES_TO_WORD( - res.second[VSOMEIP_SERVICE_POS_MIN], - res.second[VSOMEIP_SERVICE_POS_MAX]); - const method_t its_method = VSOMEIP_BYTES_TO_WORD( - res.second[VSOMEIP_METHOD_POS_MIN], - res.second[VSOMEIP_METHOD_POS_MAX]); const session_t its_session = VSOMEIP_BYTES_TO_WORD( res.second[VSOMEIP_SESSION_POS_MIN], res.second[VSOMEIP_SESSION_POS_MAX]); - - std::lock_guard<std::mutex> its_requests_guard(requests_mutex_); - requests_[its_client] - [std::make_tuple(its_service, its_method, its_session)] = _remote; - } - } else if (its_service != VSOMEIP_SD_SERVICE - && utility::is_notification(res.second[VSOMEIP_MESSAGE_TYPE_POS]) - && joined_group_) { - boost::system::error_code ec; - const auto found_address = joined_.find(_destination.to_string(ec)); - if (found_address != joined_.end()) { - found_address->second = true; + std::lock_guard<std::mutex> its_client_lock(clients_mutex_); + clients_[its_client][its_session] = _remote; } } its_host->on_message(&res.second[0], static_cast<std::uint32_t>(res.second.size()), - this, _destination, VSOMEIP_ROUTING_CLIENT, - std::make_pair(ANY_UID, ANY_GID), + this, _is_multicast, VSOMEIP_ROUTING_CLIENT, + nullptr, its_remote_address, its_remote_port); } } else { @@ -733,9 +646,9 @@ void udp_server_endpoint_impl::on_message_received( (current_message_size > VSOMEIP_SOMEIP_HEADER_SIZE && current_message_size >= remaining_bytes)) { its_host->on_message(&_buffer[i], - current_message_size, this, _destination, + current_message_size, this, _is_multicast, VSOMEIP_ROUTING_CLIENT, - std::make_pair(ANY_UID, ANY_GID), + nullptr, its_remote_address, its_remote_port); } else { //ignore messages for service discovery with shorter SomeIP length @@ -772,21 +685,63 @@ void udp_server_endpoint_impl::on_message_received( } } +bool udp_server_endpoint_impl::is_same_subnet(const boost::asio::ip::address &_address) const { + + bool is_same(true); +#if VSOMEIP_BOOST_VERSION < 106600 + // TODO: This needs some (more) testing + if (_address.is_v4()) { + uint32_t its_local(uint32_t(local_.address().to_v4().to_ulong())); + uint32_t its_mask(uint32_t(netmask_.to_v4().to_ulong())); + uint32_t its_address(uint32_t(_address.to_v4().to_ulong())); + + return ((its_local & its_mask) == (its_address & its_mask)); + } else { + boost::asio::ip::address_v6::bytes_type its_local(local_.address().to_v6().to_bytes()); + boost::asio::ip::address_v6::bytes_type its_address(_address.to_v6().to_bytes()); + + for (size_t i = 0; i < its_local.size(); ++i) { + byte_t its_mask(0x00); + if ((i+1) * sizeof(byte_t) <= prefix_) + its_mask = 0xff; + else if (i <= prefix_) + its_mask = byte_t(0xff << (((i+1) * sizeof(byte_t)) - prefix_)); + + if ((its_local[i] & its_mask) != (its_address[i] & its_mask)) + return (false); + } + + return (true); + } +#else + if (_address.is_v4()) { + boost::asio::ip::network_v4 its_network(local_.address().to_v4(), netmask_.to_v4()); + boost::asio::ip::address_v4_range its_hosts = its_network.hosts(); + is_same = (its_hosts.find(_address.to_v4()) != its_hosts.end()); + } else { + boost::asio::ip::network_v6 its_network(local_.address().to_v6(), prefix_); + boost::asio::ip::address_v6_range its_hosts = its_network.hosts(); + is_same = (its_hosts.find(_address.to_v6()) != its_hosts.end()); + } +#endif + return (is_same); +} + void udp_server_endpoint_impl::print_status() { std::lock_guard<std::mutex> its_lock(mutex_); VSOMEIP_INFO << "status use: " << std::dec << local_port_ - << " number queues: " << std::dec << queues_.size() + << " number targets: " << std::dec << targets_.size() << " recv_buffer: " << std::dec << unicast_recv_buffer_.capacity() << " multicast_recv_buffer: " << std::dec << multicast_recv_buffer_.capacity(); - for (const auto &c : queues_) { + for (const auto &c : targets_) { std::size_t its_data_size(0); std::size_t its_queue_size(0); - its_queue_size = c.second.second.size(); - its_data_size = c.second.first; + its_queue_size = c.second.queue_.size(); + its_data_size = c.second.queue_size_; boost::system::error_code ec; VSOMEIP_INFO << "status use: client: " @@ -798,14 +753,16 @@ void udp_server_endpoint_impl::print_status() { } std::string udp_server_endpoint_impl::get_remote_information( - const queue_iterator_type _queue_iterator) const { + const target_data_iterator_type _it) const { + boost::system::error_code ec; - return _queue_iterator->first.address().to_string(ec) + ":" - + std::to_string(_queue_iterator->first.port()); + return _it->first.address().to_string(ec) + ":" + + std::to_string(_it->first.port()); } std::string udp_server_endpoint_impl::get_remote_information( const endpoint_type& _remote) const { + boost::system::error_code ec; return _remote.address().to_string(ec) + ":" + std::to_string(_remote.port()); @@ -815,7 +772,7 @@ bool udp_server_endpoint_impl::is_reliable() const { return false; } -const std::string udp_server_endpoint_impl::get_address_port_local() const { +std::string udp_server_endpoint_impl::get_address_port_local() const { std::lock_guard<std::mutex> its_lock(unicast_mutex_); std::string its_address_port; @@ -835,9 +792,171 @@ const std::string udp_server_endpoint_impl::get_address_port_local() const { bool udp_server_endpoint_impl::tp_segmentation_enabled( service_t _service, method_t _method) const { - return configuration_->tp_segment_messages_service_to_client(_service, - local_.address().to_string(), - local_.port(), _method); + return configuration_->is_tp_service(_service, + local_.address().to_string(), local_.port(), + _method); +} + +void +udp_server_endpoint_impl::set_multicast_option( + const boost::asio::ip::address &_address, bool _is_join) { + + boost::system::error_code ec; + + if (_is_join) { + if (!multicast_socket_) { + std::lock_guard<std::mutex> its_guard(multicast_mutex_); + + multicast_socket_ = std::unique_ptr<socket_type>( + new socket_type(io_, local_.protocol())); + + multicast_socket_->set_option(ip::udp::socket::reuse_address(true), ec); + if (ec) + VSOMEIP_ERROR << __func__ + << ": set reuse address option failed (" << ec.message() << ")"; + +#ifdef _WIN32 + const char *its_option("0001"); + ::setsockopt(multicast_socket_->native_handle(), + (is_v4_ ? IPPROTO_IP : IPPROTO_IPV6), + (is_v4_ ? IP_PKTINFO : IPV6_PKTINFO), + its_option, sizeof(its_option)); +#else + int its_pktinfo_option(1); + ::setsockopt(multicast_socket_->native_handle(), + (is_v4_ ? IPPROTO_IP : IPPROTO_IPV6), + (is_v4_ ? IP_PKTINFO : IPV6_PKTINFO), + &its_pktinfo_option, sizeof(its_pktinfo_option)); +#endif + + if (multicast_recv_buffer_.empty()) + multicast_recv_buffer_.resize(VSOMEIP_MAX_UDP_MESSAGE_SIZE, 0); + + if (!multicast_local_) { + if (is_v4_) { + multicast_local_ = std::unique_ptr<endpoint_type>( + new endpoint_type(boost::asio::ip::address_v4::any(), local_port_)); + } else { // is_v6 + multicast_local_ = std::unique_ptr<endpoint_type>( + new endpoint_type(boost::asio::ip::address_v6::any(), local_port_)); + } + } + + multicast_socket_->bind(*multicast_local_, ec); + if (ec) + VSOMEIP_ERROR << __func__ + << ": bind failed (" << ec.message() << ")"; + + const int its_udp_recv_buffer_size = + configuration_->get_udp_receive_buffer_size(); + + multicast_socket_->set_option(boost::asio::socket_base::receive_buffer_size( + its_udp_recv_buffer_size), ec); + +#ifndef _WIN32 + // define socket timeout + struct timeval timeout; + timeout.tv_sec = 0; + timeout.tv_usec = VSOMEIP_SETSOCKOPT_TIMEOUT_US; + + if (setsockopt( + multicast_socket_->native_handle(), + SOL_SOCKET, SO_RCVTIMEO, + &timeout, sizeof(timeout)) == -1) { + VSOMEIP_WARNING << __func__ + << ": unable to setsockopt SO_RCVTIMEO"; + } + + if (setsockopt( + multicast_socket_->native_handle(), + SOL_SOCKET, SO_SNDTIMEO, + &timeout, sizeof(timeout)) == -1) { + VSOMEIP_WARNING << __func__ + << ": unable to setsockopt SO_SNDTIMEO"; + } +#endif + + if (ec) { + VSOMEIP_WARNING << "udp_server_endpoint_impl<multicast>: couldn't set " + << "SO_RCVBUF: " << ec.message() << " to: " << std::dec + << its_udp_recv_buffer_size << " local port: " << std::dec + << local_port_; + } + + boost::asio::socket_base::receive_buffer_size its_option; + multicast_socket_->get_option(its_option, ec); +#ifdef __linux__ + // If regular setting of the buffer size did not work, try to force + // (requires CAP_NET_ADMIN to be successful) + if (its_option.value() < 0 + || its_option.value() < its_udp_recv_buffer_size) { + ec.assign(setsockopt(multicast_socket_->native_handle(), + SOL_SOCKET, SO_RCVBUFFORCE, + &its_udp_recv_buffer_size, sizeof(its_udp_recv_buffer_size)), + boost::system::generic_category()); + if (!ec) { + VSOMEIP_INFO << "udp_server_endpoint_impl<multicast>: " + << "SO_RCVBUFFORCE: successful."; + } + multicast_socket_->get_option(its_option, ec); + } +#endif + if (ec) { + VSOMEIP_WARNING << "udp_server_endpoint_impl<multicast>: couldn't get " + << "SO_RCVBUF: " << ec.message() << " local port:" + << std::dec << local_port_; + } else { + VSOMEIP_INFO << "udp_server_endpoint_impl<multicast>: SO_RCVBUF is: " + << std::dec << its_option.value() + << " (" << its_udp_recv_buffer_size << ") local port:" + << std::dec << local_port_; + } + + multicast_id_++; + receive_multicast(multicast_id_); + } + + boost::asio::ip::multicast::join_group its_join_option; + { + std::lock_guard<std::mutex> its_lock(local_mutex_); + if (is_v4_) { + + its_join_option = boost::asio::ip::multicast::join_group( + _address.to_v4(), + local_.address().to_v4()); + } else { + its_join_option = boost::asio::ip::multicast::join_group( + _address.to_v6(), + static_cast<unsigned int>(local_.address().to_v6().scope_id())); + } + } + multicast_socket_->set_option(its_join_option, ec); + + if (!ec) { + std::lock_guard<std::mutex> its_guard(multicast_mutex_); + joined_[_address.to_string()] = false; + joined_group_ = true; + } + } else { + if (multicast_socket_) { + boost::asio::ip::multicast::leave_group its_leave_option(_address); + multicast_socket_->set_option(its_leave_option, ec); + + if (!ec) { + std::lock_guard<std::mutex> its_guard(multicast_mutex_); + joined_.erase(_address.to_string()); + + if (0 == joined_.size()) { + joined_group_ = false; + + multicast_socket_->cancel(ec); + + multicast_socket_.reset(nullptr); + multicast_local_.reset(nullptr); + } + } + } + } } } // namespace vsomeip_v3 |