summaryrefslogtreecommitdiff
path: root/implementation/endpoints/src/udp_server_endpoint_impl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'implementation/endpoints/src/udp_server_endpoint_impl.cpp')
-rw-r--r--implementation/endpoints/src/udp_server_endpoint_impl.cpp617
1 files changed, 368 insertions, 249 deletions
diff --git a/implementation/endpoints/src/udp_server_endpoint_impl.cpp b/implementation/endpoints/src/udp_server_endpoint_impl.cpp
index bd44b48..79dee1c 100644
--- a/implementation/endpoints/src/udp_server_endpoint_impl.cpp
+++ b/implementation/endpoints/src/udp_server_endpoint_impl.cpp
@@ -1,12 +1,17 @@
-// Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
+// Copyright (C) 2014-2021 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
#include <iomanip>
#include <sstream>
+#include <thread>
#include <boost/asio/ip/multicast.hpp>
+#if VSOMEIP_BOOST_VERSION >= 106600
+#include <boost/asio/ip/network_v4.hpp>
+#include <boost/asio/ip/network_v6.hpp>
+#endif
#include <vsomeip/constants.hpp>
#include <vsomeip/internal/logger.hpp>
@@ -14,8 +19,9 @@
#include "../include/endpoint_definition.hpp"
#include "../include/endpoint_host.hpp"
#include "../include/tp.hpp"
-#include "../../routing/include/routing_host.hpp"
#include "../include/udp_server_endpoint_impl.hpp"
+#include "../include/udp_server_endpoint_impl_receive_op.hpp"
+#include "../../routing/include/routing_host.hpp"
#include "../../configuration/include/configuration.hpp"
#include "../../utility/include/byteorder.hpp"
#include "../../utility/include/utility.hpp"
@@ -29,16 +35,24 @@ udp_server_endpoint_impl::udp_server_endpoint_impl(
const std::shared_ptr<endpoint_host>& _endpoint_host,
const std::shared_ptr<routing_host>& _routing_host,
const endpoint_type& _local,
- boost::asio::io_service &_io,
+ boost::asio::io_context &_io,
const std::shared_ptr<configuration>& _configuration) :
- server_endpoint_impl<ip::udp_ext>(_endpoint_host, _routing_host, _local,
- _io, VSOMEIP_MAX_UDP_MESSAGE_SIZE,
- _configuration->get_endpoint_queue_limit(_configuration->get_unicast_address().to_string(), _local.port()),
- _configuration),
+#if VSOMEIP_BOOST_VERSION >= 106600
+ server_endpoint_impl<ip::udp>(
+#else
+ server_endpoint_impl<ip::udp_ext>(
+#endif
+ _endpoint_host, _routing_host, _local,
+ _io, VSOMEIP_MAX_UDP_MESSAGE_SIZE,
+ _configuration->get_endpoint_queue_limit(_configuration->get_unicast_address().to_string(), _local.port()),
+ _configuration),
unicast_socket_(_io, _local.protocol()),
unicast_recv_buffer_(VSOMEIP_MAX_UDP_MESSAGE_SIZE, 0),
+ is_v4_(false),
multicast_id_(0),
joined_group_(false),
+ netmask_(_configuration->get_netmask()),
+ prefix_(_configuration->get_prefix()),
local_port_(_local.port()),
tp_reassembler_(std::make_shared<tp::tp_reassembler>(_configuration->get_max_message_size_unreliable(), _io)),
tp_cleanup_timer_(_io) {
@@ -48,9 +62,11 @@ udp_server_endpoint_impl::udp_server_endpoint_impl(
boost::asio::socket_base::reuse_address optionReuseAddress(true);
unicast_socket_.set_option(optionReuseAddress, ec);
- boost::asio::detail::throw_error(ec, "reuse address");
+ if (ec)
+ VSOMEIP_ERROR << __func__
+ << ": set reuse address option failed (" << ec.message() << ")";
-#ifndef _WIN32
+#if defined(__linux__) || defined(ANDROID)
// If specified, bind to device
std::string its_device(configuration_->get_device());
if (its_device != "") {
@@ -62,27 +78,37 @@ udp_server_endpoint_impl::udp_server_endpoint_impl(
#endif
unicast_socket_.bind(_local, ec);
- boost::asio::detail::throw_error(ec, "bind");
+ if (ec)
+ VSOMEIP_ERROR << __func__
+ << ": bind failed (" << ec.message() << ")";
if (local_.address().is_v4()) {
+ is_v4_ = true;
boost::asio::ip::multicast::outbound_interface option(_local.address().to_v4());
unicast_socket_.set_option(option, ec);
- boost::asio::detail::throw_error(ec, "outbound interface option IPv4");
- } else if (local_.address().is_v6()) {
+ if (ec)
+ VSOMEIP_ERROR << __func__
+ << ": set IPv4 outbound interface option failed (" << ec.message() << ")";
+ } else {
boost::asio::ip::multicast::outbound_interface option(
static_cast<unsigned int>(local_.address().to_v6().scope_id()));
unicast_socket_.set_option(option, ec);
- boost::asio::detail::throw_error(ec, "outbound interface option IPv6");
+ if (ec)
+ VSOMEIP_ERROR << __func__
+ << ": set IPv6 outbound interface option failed (" << ec.message() << ")";
}
boost::asio::socket_base::broadcast option(true);
unicast_socket_.set_option(option, ec);
- boost::asio::detail::throw_error(ec, "broadcast option");
+ if (ec)
+ VSOMEIP_ERROR << __func__
+ << ": set broadcast option failed (" << ec.message() << ")";
const int its_udp_recv_buffer_size =
configuration_->get_udp_receive_buffer_size();
unicast_socket_.set_option(boost::asio::socket_base::receive_buffer_size(
its_udp_recv_buffer_size), ec);
+
if (ec) {
VSOMEIP_WARNING << "udp_server_endpoint_impl: couldn't set "
<< "SO_RCVBUF: " << ec.message() << " to: " << std::dec
@@ -118,17 +144,6 @@ udp_server_endpoint_impl::udp_server_endpoint_impl(
<< " (" << its_udp_recv_buffer_size << ") local port:"
<< std::dec << local_port_;
}
-
-
-#ifdef _WIN32
- const char* optval("0001");
- ::setsockopt(unicast_socket_.native_handle(), IPPROTO_IP, IP_PKTINFO,
- optval, sizeof(optval));
-#else
- int optval(1);
- ::setsockopt(unicast_socket_.native_handle(), IPPROTO_IP, IP_PKTINFO,
- &optval, sizeof(optval));
-#endif
}
udp_server_endpoint_impl::~udp_server_endpoint_impl() {
@@ -175,7 +190,7 @@ void udp_server_endpoint_impl::receive_unicast() {
std::lock_guard<std::mutex> its_lock(unicast_mutex_);
- if(unicast_socket_.is_open()) {
+ if (unicast_socket_.is_open()) {
unicast_socket_.async_receive_from(
boost::asio::buffer(&unicast_recv_buffer_[0], max_message_size_),
unicast_remote_,
@@ -184,8 +199,7 @@ void udp_server_endpoint_impl::receive_unicast() {
std::dynamic_pointer_cast<
udp_server_endpoint_impl >(shared_from_this()),
std::placeholders::_1,
- std::placeholders::_2,
- std::placeholders::_3
+ std::placeholders::_2
)
);
}
@@ -197,8 +211,9 @@ void udp_server_endpoint_impl::receive_unicast() {
void udp_server_endpoint_impl::receive_multicast(uint8_t _multicast_id) {
if (_multicast_id == multicast_id_ && multicast_socket_ && multicast_socket_->is_open()) {
- multicast_socket_->async_receive_from(
- boost::asio::buffer(&multicast_recv_buffer_[0], max_message_size_),
+#if VSOMEIP_BOOST_VERSION >= 106600
+ udp_server_endpoint_impl_receive_op its_operation {
+ *multicast_socket_,
multicast_remote_,
std::bind(
&udp_server_endpoint_impl::on_multicast_received,
@@ -207,9 +222,31 @@ void udp_server_endpoint_impl::receive_multicast(uint8_t _multicast_id) {
std::placeholders::_1,
std::placeholders::_2,
std::placeholders::_3,
- _multicast_id
+ std::placeholders::_4
+ ),
+ &multicast_recv_buffer_[0],
+ max_message_size_,
+ _multicast_id,
+ is_v4_,
+ boost::asio::ip::address(),
+ std::numeric_limits<size_t>::min()
+ };
+ multicast_socket_->async_wait(socket_type::wait_read, its_operation);
+#else
+ multicast_socket_->async_receive_from(
+ boost::asio::buffer(&multicast_recv_buffer_[0], max_message_size_),
+ multicast_remote_,
+ std::bind(
+ &udp_server_endpoint_impl::on_multicast_received,
+ std::dynamic_pointer_cast<
+ udp_server_endpoint_impl >(shared_from_this()),
+ std::placeholders::_1,
+ std::placeholders::_2,
+ _multicast_id,
+ std::placeholders::_3
)
);
+#endif
}
}
@@ -229,50 +266,68 @@ bool udp_server_endpoint_impl::send_error(
bool ret(false);
std::lock_guard<std::mutex> its_lock(mutex_);
const endpoint_type its_target(_target->get_address(), _target->get_port());
- const queue_iterator_type target_queue_iterator(find_or_create_queue_unlocked(its_target));
- auto& its_qpair = target_queue_iterator->second;
- const bool queue_size_zero_on_entry(its_qpair.second.empty());
+ const auto its_target_iterator(find_or_create_target_unlocked(its_target));
+ auto& its_data = its_target_iterator->second;
+ const bool queue_size_zero_on_entry(its_data.queue_.empty());
if (check_message_size(nullptr, _size, its_target) == endpoint_impl::cms_ret_e::MSG_OK &&
- check_queue_limit(_data, _size, its_qpair.first)) {
- its_qpair.second.emplace_back(
- std::make_shared<message_buffer_t>(_data, _data + _size));
- its_qpair.first += _size;
+ check_queue_limit(_data, _size, its_data.queue_size_)) {
+ its_data.queue_.emplace_back(
+ std::make_pair(std::make_shared<message_buffer_t>(_data, _data + _size), 0));
+ its_data.queue_size_ += _size;
if (queue_size_zero_on_entry) { // no writing in progress
- send_queued(target_queue_iterator);
+ (void)send_queued(its_target_iterator);
}
ret = true;
}
return ret;
}
-void udp_server_endpoint_impl::send_queued(
- const queue_iterator_type _queue_iterator) {
+bool udp_server_endpoint_impl::send_queued(
+ const target_data_iterator_type _it) {
- message_buffer_ptr_t its_buffer = _queue_iterator->second.second.front();
+ static std::chrono::steady_clock::time_point its_last_sent;
+ const auto its_entry = _it->second.queue_.front();
#if 0
- std::stringstream msg;
- msg << "usei::sq(" << _queue_iterator->first.address().to_string() << ":"
- << _queue_iterator->first.port() << "): ";
- for (std::size_t i = 0; i < its_buffer->size(); ++i)
- msg << std::hex << std::setw(2) << std::setfill('0')
- << (int)(*its_buffer)[i] << " ";
- VSOMEIP_INFO << msg.str();
+ std::stringstream msg;
+ msg << "usei::sq(" << _queue_iterator->first.address().to_string() << ":"
+ << _queue_iterator->first.port() << "): ";
+ for (std::size_t i = 0; i < its_buffer->size(); ++i)
+ msg << std::hex << std::setw(2) << std::setfill('0')
+ << (int)(*its_entry.first)[i] << " ";
+ VSOMEIP_INFO << msg.str();
#endif
std::lock_guard<std::mutex> its_lock(unicast_mutex_);
+ // Check whether we need to wait (SOME/IP-TP separation time)
+ if (its_entry.second > 0) {
+ if (its_last_sent != std::chrono::steady_clock::time_point()) {
+ const auto its_elapsed
+ = std::chrono::duration_cast<std::chrono::microseconds>(
+ std::chrono::steady_clock::now() - its_last_sent).count();
+ if (its_entry.second > its_elapsed)
+ std::this_thread::sleep_for(
+ std::chrono::microseconds(its_entry.second - its_elapsed));
+ }
+ its_last_sent = std::chrono::steady_clock::now();
+ } else {
+ its_last_sent = std::chrono::steady_clock::time_point();
+ }
+
unicast_socket_.async_send_to(
- boost::asio::buffer(*its_buffer),
- _queue_iterator->first,
+ boost::asio::buffer(*its_entry.first),
+ _it->first,
std::bind(
&udp_server_endpoint_base_impl::send_cbk,
shared_from_this(),
- _queue_iterator,
+ _it,
std::placeholders::_1,
std::placeholders::_2
)
);
+
+ return false;
}
void udp_server_endpoint_impl::get_configured_times_from_endpoint(
@@ -325,126 +380,18 @@ void udp_server_endpoint_impl::join_unlocked(const std::string &_address) {
VSOMEIP_DEBUG << "Joining to multicast group " << _address
<< " from " << local_.address().to_string();
- boost::system::error_code ec;
-
- bool is_v4(false);
- bool is_v6(false);
- {
- std::lock_guard<std::mutex> its_lock(local_mutex_);
- is_v4 = local_.address().is_v4();
- is_v6 = local_.address().is_v6();
- }
-
- if (multicast_recv_buffer_.empty())
- multicast_recv_buffer_.resize(VSOMEIP_MAX_UDP_MESSAGE_SIZE, 0);
-
- if (!multicast_local_) {
- if (is_v4) {
- multicast_local_ = std::unique_ptr<endpoint_type>(
- new endpoint_type(boost::asio::ip::address_v4::any(), local_port_));
- }
- if (is_v6) {
- multicast_local_ = std::unique_ptr<endpoint_type>(
- new endpoint_type(boost::asio::ip::address_v6::any(), local_port_));
- }
- }
-
- if (!multicast_socket_) {
- multicast_socket_ = std::unique_ptr<socket_type>(
- new socket_type(service_, local_.protocol()));
-
- boost::asio::socket_base::reuse_address optionReuseAddress(true);
- multicast_socket_->set_option(optionReuseAddress, ec);
- boost::asio::detail::throw_error(ec, "reuse address in multicast");
- boost::asio::socket_base::broadcast optionBroadcast(true);
- multicast_socket_->set_option(optionBroadcast, ec);
- boost::asio::detail::throw_error(ec, "set broadcast option");
-
- multicast_socket_->bind(*multicast_local_, ec);
- boost::asio::detail::throw_error(ec, "bind multicast");
-
- const int its_udp_recv_buffer_size =
- configuration_->get_udp_receive_buffer_size();
- multicast_socket_->set_option(boost::asio::socket_base::receive_buffer_size(
- its_udp_recv_buffer_size), ec);
- if (ec) {
- VSOMEIP_WARNING << "udp_server_endpoint_impl<multicast>: couldn't set "
- << "SO_RCVBUF: " << ec.message() << " to: " << std::dec
- << its_udp_recv_buffer_size << " local port: " << std::dec
- << local_port_;
- }
-
- boost::asio::socket_base::receive_buffer_size its_option;
- multicast_socket_->get_option(its_option, ec);
- #ifdef __linux__
- // If regular setting of the buffer size did not work, try to force
- // (requires CAP_NET_ADMIN to be successful)
- if (its_option.value() < 0
- || its_option.value() < its_udp_recv_buffer_size) {
- ec.assign(setsockopt(multicast_socket_->native_handle(),
- SOL_SOCKET, SO_RCVBUFFORCE,
- &its_udp_recv_buffer_size, sizeof(its_udp_recv_buffer_size)),
- boost::system::generic_category());
- if (!ec) {
- VSOMEIP_INFO << "udp_server_endpoint_impl<multicast>: "
- << "SO_RCVBUFFORCE: successful.";
- }
- multicast_socket_->get_option(its_option, ec);
- }
- #endif
- if (ec) {
- VSOMEIP_WARNING << "udp_server_endpoint_impl<multicast>: couldn't get "
- << "SO_RCVBUF: " << ec.message() << " local port:"
- << std::dec << local_port_;
- } else {
- VSOMEIP_INFO << "udp_server_endpoint_impl<multicast>: SO_RCVBUF is: "
- << std::dec << its_option.value()
- << " (" << its_udp_recv_buffer_size << ") local port:"
- << std::dec << local_port_;
- }
-
-#ifdef _WIN32
- const char* optval("0001");
- if (is_v4) {
- ::setsockopt(multicast_socket_->native_handle(), IPPROTO_IP, IP_PKTINFO,
- optval, sizeof(optval));
- } else if (is_v6) {
- ::setsockopt(multicast_socket_->native_handle(), IPPROTO_IPV6, IPV6_PKTINFO,
- optval, sizeof(optval));
- }
+ auto its_endpoint_host = endpoint_host_.lock();
+ if (its_endpoint_host) {
+ multicast_option_t its_join_option { shared_from_this(), true,
+#if VSOMEIP_BOOST_VERSION < 106600
+ boost::asio::ip::address::from_string(_address) };
#else
- int optval(1);
- if (is_v4) {
- ::setsockopt(multicast_socket_->native_handle(), IPPROTO_IP, IP_PKTINFO,
- &optval, sizeof(optval));
- } else {
- ::setsockopt(multicast_socket_->native_handle(), IPPROTO_IPV6, IPV6_RECVPKTINFO,
- &optval, sizeof(optval));
- }
+ boost::asio::ip::make_address(_address) };
#endif
- multicast_id_++;
- receive_multicast(multicast_id_);
- }
-
- if (is_v4) {
- multicast_socket_->set_option(ip::udp_ext::socket::reuse_address(true));
- multicast_socket_->set_option(
- boost::asio::ip::multicast::enable_loopback(false));
- multicast_socket_->set_option(boost::asio::ip::multicast::join_group(
- boost::asio::ip::address::from_string(_address).to_v4(),
- local_.address().to_v4()));
- } else if (is_v6) {
- multicast_socket_->set_option(ip::udp_ext::socket::reuse_address(true));
- multicast_socket_->set_option(
- boost::asio::ip::multicast::enable_loopback(false));
- multicast_socket_->set_option(boost::asio::ip::multicast::join_group(
- boost::asio::ip::address::from_string(_address).to_v6(),
- local_.address().to_v6().scope_id()));
+ its_endpoint_host->add_multicast_option(its_join_option);
}
joined_[_address] = false;
- joined_group_ = true;
-
} catch (const std::exception &e) {
VSOMEIP_ERROR << "udp_server_endpoint_impl::join" << ":" << e.what()
<< " address: " << _address;
@@ -473,31 +420,20 @@ void udp_server_endpoint_impl::leave_unlocked(const std::string &_address) {
VSOMEIP_DEBUG << "Leaving the multicast group " << _address
<< " from " << local_.address().to_string();
- bool is_v4(false);
- bool is_v6(false);
- {
- std::lock_guard<std::mutex> its_lock(local_mutex_);
- is_v4 = local_.address().is_v4();
- is_v6 = local_.address().is_v6();
- }
- if (is_v4) {
- multicast_socket_->set_option(boost::asio::ip::multicast::leave_group(
- boost::asio::ip::address::from_string(_address)));
- } else if (is_v6) {
- multicast_socket_->set_option(boost::asio::ip::multicast::leave_group(
- boost::asio::ip::address::from_string(_address)));
+ if (multicast_socket_) {
+ auto its_endpoint_host = endpoint_host_.lock();
+ if (its_endpoint_host) {
+ multicast_option_t its_leave_option { shared_from_this(),
+#if VSOMEIP_BOOST_VERSION < 106600
+ false, boost::asio::ip::address::from_string(_address) };
+#else
+ false, boost::asio::ip::make_address(_address) };
+#endif
+ its_endpoint_host->add_multicast_option(its_leave_option);
+ }
}
joined_.erase(_address);
- if (0 == joined_.size()) {
- joined_group_ = false;
-
- boost::system::error_code ec;
- multicast_socket_->cancel(ec);
-
- multicast_socket_.reset(nullptr);
- multicast_local_.reset(nullptr);
- }
}
}
catch (const std::exception &e) {
@@ -541,8 +477,7 @@ void udp_server_endpoint_impl::set_local_port(std::uint16_t _port) {
void udp_server_endpoint_impl::on_unicast_received(
boost::system::error_code const &_error,
- std::size_t _bytes,
- boost::asio::ip::address const &_destination) {
+ std::size_t _bytes) {
if (_error != boost::asio::error::operation_aborted) {
{
@@ -550,7 +485,7 @@ void udp_server_endpoint_impl::on_unicast_received(
// & multicast messages are not processed in parallel. This aligns
// the behavior of endpoints with one and two active sockets.
std::lock_guard<std::mutex> its_lock(multicast_mutex_);
- on_message_received(_error, _bytes, _destination,
+ on_message_received(_error, _bytes, false,
unicast_remote_, unicast_recv_buffer_);
}
receive_unicast();
@@ -560,14 +495,20 @@ void udp_server_endpoint_impl::on_unicast_received(
void udp_server_endpoint_impl::on_multicast_received(
boost::system::error_code const &_error,
std::size_t _bytes,
- boost::asio::ip::address const &_destination,
- uint8_t _multicast_id) {
+ uint8_t _multicast_id,
+ const boost::asio::ip::address &_destination) {
std::lock_guard<std::mutex> its_lock(multicast_mutex_);
if (_error != boost::asio::error::operation_aborted) {
// Filter messages sent from the same source address
- if (multicast_remote_.address() != local_.address()) {
- on_message_received(_error, _bytes, _destination,
+ if (multicast_remote_.address() != local_.address()
+ && is_same_subnet(multicast_remote_.address())) {
+
+ auto find_joined = joined_.find(_destination.to_string());
+ if (find_joined != joined_.end())
+ find_joined->second = true;
+
+ on_message_received(_error, _bytes, true,
multicast_remote_, multicast_recv_buffer_);
}
@@ -577,7 +518,7 @@ void udp_server_endpoint_impl::on_multicast_received(
void udp_server_endpoint_impl::on_message_received(
boost::system::error_code const &_error, std::size_t _bytes,
- boost::asio::ip::address const &_destination,
+ bool _is_multicast,
endpoint_type const &_remote,
message_buffer_t const &_buffer) {
#if 0
@@ -585,7 +526,7 @@ void udp_server_endpoint_impl::on_message_received(
msg << "usei::rcb(" << _error.message() << "): ";
for (std::size_t i = 0; i < _bytes; ++i)
msg << std::hex << std::setw(2) << std::setfill('0')
- << (int) recv_buffer_[i] << " ";
+ << (int) _buffer[i] << " ";
VSOMEIP_INFO << msg.str();
#endif
std::shared_ptr<routing_host> its_host = routing_host_.lock();
@@ -613,7 +554,7 @@ void udp_server_endpoint_impl::on_message_received(
} else if (current_message_size > VSOMEIP_RETURN_CODE_POS &&
(_buffer[i + VSOMEIP_PROTOCOL_VERSION_POS] != VSOMEIP_PROTOCOL_VERSION ||
!utility::is_valid_message_type(tp::tp::tp_flag_unset(_buffer[i + VSOMEIP_MESSAGE_TYPE_POS])) ||
- /*!utility::is_valid_return_code(static_cast<return_code_e>(_buffer[i + VSOMEIP_RETURN_CODE_POS])) ||*/
+ !utility::is_valid_return_code(static_cast<return_code_e>(_buffer[i + VSOMEIP_RETURN_CODE_POS])) ||
(tp::tp::tp_flag_is_set(_buffer[i + VSOMEIP_MESSAGE_TYPE_POS]) && get_local_port() == configuration_->get_sd_port())
)) {
if (_buffer[i + VSOMEIP_PROTOCOL_VERSION_POS] != VSOMEIP_PROTOCOL_VERSION) {
@@ -625,9 +566,9 @@ void udp_server_endpoint_impl::on_message_received(
// ensure to send back a message w/ wrong protocol version
its_host->on_message(&_buffer[i],
VSOMEIP_SOMEIP_HEADER_SIZE + 8, this,
- _destination,
+ _is_multicast,
VSOMEIP_ROUTING_CLIENT,
- std::make_pair(ANY_UID, ANY_GID),
+ nullptr,
its_remote_address, its_remote_port);
} else if (!utility::is_valid_message_type(tp::tp::tp_flag_unset(
_buffer[i + VSOMEIP_MESSAGE_TYPE_POS]))) {
@@ -663,21 +604,9 @@ void udp_server_endpoint_impl::on_message_received(
const session_t its_session = VSOMEIP_BYTES_TO_WORD(
_buffer[i + VSOMEIP_SESSION_POS_MIN],
_buffer[i + VSOMEIP_SESSION_POS_MAX]);
- const method_t its_method = VSOMEIP_BYTES_TO_WORD(
- _buffer[i + VSOMEIP_METHOD_POS_MIN],
- _buffer[i + VSOMEIP_METHOD_POS_MAX]);
-
- std::lock_guard<std::mutex> its_requests_guard(requests_mutex_);
- requests_[its_client]
- [std::make_tuple(its_service, its_method, its_session)] = _remote;
- }
- } else if (its_service != VSOMEIP_SD_SERVICE
- && utility::is_notification(_buffer[i + VSOMEIP_MESSAGE_TYPE_POS])
- && joined_group_) {
- boost::system::error_code ec;
- const auto found_address = joined_.find(_destination.to_string(ec));
- if (found_address != joined_.end()) {
- found_address->second = true;
+ clients_mutex_.lock();
+ clients_[its_client][its_session] = _remote;
+ clients_mutex_.unlock();
}
}
if (tp::tp::tp_flag_is_set(_buffer[i + VSOMEIP_MESSAGE_TYPE_POS])) {
@@ -699,33 +628,17 @@ void udp_server_endpoint_impl::on_message_received(
res.second[VSOMEIP_CLIENT_POS_MIN],
res.second[VSOMEIP_CLIENT_POS_MAX]);
if (its_client != MAGIC_COOKIE_CLIENT) {
- const service_t its_service = VSOMEIP_BYTES_TO_WORD(
- res.second[VSOMEIP_SERVICE_POS_MIN],
- res.second[VSOMEIP_SERVICE_POS_MAX]);
- const method_t its_method = VSOMEIP_BYTES_TO_WORD(
- res.second[VSOMEIP_METHOD_POS_MIN],
- res.second[VSOMEIP_METHOD_POS_MAX]);
const session_t its_session = VSOMEIP_BYTES_TO_WORD(
res.second[VSOMEIP_SESSION_POS_MIN],
res.second[VSOMEIP_SESSION_POS_MAX]);
-
- std::lock_guard<std::mutex> its_requests_guard(requests_mutex_);
- requests_[its_client]
- [std::make_tuple(its_service, its_method, its_session)] = _remote;
- }
- } else if (its_service != VSOMEIP_SD_SERVICE
- && utility::is_notification(res.second[VSOMEIP_MESSAGE_TYPE_POS])
- && joined_group_) {
- boost::system::error_code ec;
- const auto found_address = joined_.find(_destination.to_string(ec));
- if (found_address != joined_.end()) {
- found_address->second = true;
+ std::lock_guard<std::mutex> its_client_lock(clients_mutex_);
+ clients_[its_client][its_session] = _remote;
}
}
its_host->on_message(&res.second[0],
static_cast<std::uint32_t>(res.second.size()),
- this, _destination, VSOMEIP_ROUTING_CLIENT,
- std::make_pair(ANY_UID, ANY_GID),
+ this, _is_multicast, VSOMEIP_ROUTING_CLIENT,
+ nullptr,
its_remote_address, its_remote_port);
}
} else {
@@ -733,9 +646,9 @@ void udp_server_endpoint_impl::on_message_received(
(current_message_size > VSOMEIP_SOMEIP_HEADER_SIZE &&
current_message_size >= remaining_bytes)) {
its_host->on_message(&_buffer[i],
- current_message_size, this, _destination,
+ current_message_size, this, _is_multicast,
VSOMEIP_ROUTING_CLIENT,
- std::make_pair(ANY_UID, ANY_GID),
+ nullptr,
its_remote_address, its_remote_port);
} else {
//ignore messages for service discovery with shorter SomeIP length
@@ -772,21 +685,63 @@ void udp_server_endpoint_impl::on_message_received(
}
}
+bool udp_server_endpoint_impl::is_same_subnet(const boost::asio::ip::address &_address) const {
+
+ bool is_same(true);
+#if VSOMEIP_BOOST_VERSION < 106600
+ // TODO: This needs some (more) testing
+ if (_address.is_v4()) {
+ uint32_t its_local(uint32_t(local_.address().to_v4().to_ulong()));
+ uint32_t its_mask(uint32_t(netmask_.to_v4().to_ulong()));
+ uint32_t its_address(uint32_t(_address.to_v4().to_ulong()));
+
+ return ((its_local & its_mask) == (its_address & its_mask));
+ } else {
+ boost::asio::ip::address_v6::bytes_type its_local(local_.address().to_v6().to_bytes());
+ boost::asio::ip::address_v6::bytes_type its_address(_address.to_v6().to_bytes());
+
+ for (size_t i = 0; i < its_local.size(); ++i) {
+ byte_t its_mask(0x00);
+ if ((i+1) * sizeof(byte_t) <= prefix_)
+ its_mask = 0xff;
+ else if (i <= prefix_)
+ its_mask = byte_t(0xff << (((i+1) * sizeof(byte_t)) - prefix_));
+
+ if ((its_local[i] & its_mask) != (its_address[i] & its_mask))
+ return (false);
+ }
+
+ return (true);
+ }
+#else
+ if (_address.is_v4()) {
+ boost::asio::ip::network_v4 its_network(local_.address().to_v4(), netmask_.to_v4());
+ boost::asio::ip::address_v4_range its_hosts = its_network.hosts();
+ is_same = (its_hosts.find(_address.to_v4()) != its_hosts.end());
+ } else {
+ boost::asio::ip::network_v6 its_network(local_.address().to_v6(), prefix_);
+ boost::asio::ip::address_v6_range its_hosts = its_network.hosts();
+ is_same = (its_hosts.find(_address.to_v6()) != its_hosts.end());
+ }
+#endif
+ return (is_same);
+}
+
void udp_server_endpoint_impl::print_status() {
std::lock_guard<std::mutex> its_lock(mutex_);
VSOMEIP_INFO << "status use: " << std::dec << local_port_
- << " number queues: " << std::dec << queues_.size()
+ << " number targets: " << std::dec << targets_.size()
<< " recv_buffer: "
<< std::dec << unicast_recv_buffer_.capacity()
<< " multicast_recv_buffer: "
<< std::dec << multicast_recv_buffer_.capacity();
- for (const auto &c : queues_) {
+ for (const auto &c : targets_) {
std::size_t its_data_size(0);
std::size_t its_queue_size(0);
- its_queue_size = c.second.second.size();
- its_data_size = c.second.first;
+ its_queue_size = c.second.queue_.size();
+ its_data_size = c.second.queue_size_;
boost::system::error_code ec;
VSOMEIP_INFO << "status use: client: "
@@ -798,14 +753,16 @@ void udp_server_endpoint_impl::print_status() {
}
std::string udp_server_endpoint_impl::get_remote_information(
- const queue_iterator_type _queue_iterator) const {
+ const target_data_iterator_type _it) const {
+
boost::system::error_code ec;
- return _queue_iterator->first.address().to_string(ec) + ":"
- + std::to_string(_queue_iterator->first.port());
+ return _it->first.address().to_string(ec) + ":"
+ + std::to_string(_it->first.port());
}
std::string udp_server_endpoint_impl::get_remote_information(
const endpoint_type& _remote) const {
+
boost::system::error_code ec;
return _remote.address().to_string(ec) + ":"
+ std::to_string(_remote.port());
@@ -815,7 +772,7 @@ bool udp_server_endpoint_impl::is_reliable() const {
return false;
}
-const std::string udp_server_endpoint_impl::get_address_port_local() const {
+std::string udp_server_endpoint_impl::get_address_port_local() const {
std::lock_guard<std::mutex> its_lock(unicast_mutex_);
std::string its_address_port;
@@ -835,9 +792,171 @@ const std::string udp_server_endpoint_impl::get_address_port_local() const {
bool udp_server_endpoint_impl::tp_segmentation_enabled(
service_t _service, method_t _method) const {
- return configuration_->tp_segment_messages_service_to_client(_service,
- local_.address().to_string(),
- local_.port(), _method);
+ return configuration_->is_tp_service(_service,
+ local_.address().to_string(), local_.port(),
+ _method);
+}
+
+void
+udp_server_endpoint_impl::set_multicast_option(
+ const boost::asio::ip::address &_address, bool _is_join) {
+
+ boost::system::error_code ec;
+
+ if (_is_join) {
+ if (!multicast_socket_) {
+ std::lock_guard<std::mutex> its_guard(multicast_mutex_);
+
+ multicast_socket_ = std::unique_ptr<socket_type>(
+ new socket_type(io_, local_.protocol()));
+
+ multicast_socket_->set_option(ip::udp::socket::reuse_address(true), ec);
+ if (ec)
+ VSOMEIP_ERROR << __func__
+ << ": set reuse address option failed (" << ec.message() << ")";
+
+#ifdef _WIN32
+ const char *its_option("0001");
+ ::setsockopt(multicast_socket_->native_handle(),
+ (is_v4_ ? IPPROTO_IP : IPPROTO_IPV6),
+ (is_v4_ ? IP_PKTINFO : IPV6_PKTINFO),
+ its_option, sizeof(its_option));
+#else
+ int its_pktinfo_option(1);
+ ::setsockopt(multicast_socket_->native_handle(),
+ (is_v4_ ? IPPROTO_IP : IPPROTO_IPV6),
+ (is_v4_ ? IP_PKTINFO : IPV6_PKTINFO),
+ &its_pktinfo_option, sizeof(its_pktinfo_option));
+#endif
+
+ if (multicast_recv_buffer_.empty())
+ multicast_recv_buffer_.resize(VSOMEIP_MAX_UDP_MESSAGE_SIZE, 0);
+
+ if (!multicast_local_) {
+ if (is_v4_) {
+ multicast_local_ = std::unique_ptr<endpoint_type>(
+ new endpoint_type(boost::asio::ip::address_v4::any(), local_port_));
+ } else { // is_v6
+ multicast_local_ = std::unique_ptr<endpoint_type>(
+ new endpoint_type(boost::asio::ip::address_v6::any(), local_port_));
+ }
+ }
+
+ multicast_socket_->bind(*multicast_local_, ec);
+ if (ec)
+ VSOMEIP_ERROR << __func__
+ << ": bind failed (" << ec.message() << ")";
+
+ const int its_udp_recv_buffer_size =
+ configuration_->get_udp_receive_buffer_size();
+
+ multicast_socket_->set_option(boost::asio::socket_base::receive_buffer_size(
+ its_udp_recv_buffer_size), ec);
+
+#ifndef _WIN32
+ // define socket timeout
+ struct timeval timeout;
+ timeout.tv_sec = 0;
+ timeout.tv_usec = VSOMEIP_SETSOCKOPT_TIMEOUT_US;
+
+ if (setsockopt(
+ multicast_socket_->native_handle(),
+ SOL_SOCKET, SO_RCVTIMEO,
+ &timeout, sizeof(timeout)) == -1) {
+ VSOMEIP_WARNING << __func__
+ << ": unable to setsockopt SO_RCVTIMEO";
+ }
+
+ if (setsockopt(
+ multicast_socket_->native_handle(),
+ SOL_SOCKET, SO_SNDTIMEO,
+ &timeout, sizeof(timeout)) == -1) {
+ VSOMEIP_WARNING << __func__
+ << ": unable to setsockopt SO_SNDTIMEO";
+ }
+#endif
+
+ if (ec) {
+ VSOMEIP_WARNING << "udp_server_endpoint_impl<multicast>: couldn't set "
+ << "SO_RCVBUF: " << ec.message() << " to: " << std::dec
+ << its_udp_recv_buffer_size << " local port: " << std::dec
+ << local_port_;
+ }
+
+ boost::asio::socket_base::receive_buffer_size its_option;
+ multicast_socket_->get_option(its_option, ec);
+#ifdef __linux__
+ // If regular setting of the buffer size did not work, try to force
+ // (requires CAP_NET_ADMIN to be successful)
+ if (its_option.value() < 0
+ || its_option.value() < its_udp_recv_buffer_size) {
+ ec.assign(setsockopt(multicast_socket_->native_handle(),
+ SOL_SOCKET, SO_RCVBUFFORCE,
+ &its_udp_recv_buffer_size, sizeof(its_udp_recv_buffer_size)),
+ boost::system::generic_category());
+ if (!ec) {
+ VSOMEIP_INFO << "udp_server_endpoint_impl<multicast>: "
+ << "SO_RCVBUFFORCE: successful.";
+ }
+ multicast_socket_->get_option(its_option, ec);
+ }
+#endif
+ if (ec) {
+ VSOMEIP_WARNING << "udp_server_endpoint_impl<multicast>: couldn't get "
+ << "SO_RCVBUF: " << ec.message() << " local port:"
+ << std::dec << local_port_;
+ } else {
+ VSOMEIP_INFO << "udp_server_endpoint_impl<multicast>: SO_RCVBUF is: "
+ << std::dec << its_option.value()
+ << " (" << its_udp_recv_buffer_size << ") local port:"
+ << std::dec << local_port_;
+ }
+
+ multicast_id_++;
+ receive_multicast(multicast_id_);
+ }
+
+ boost::asio::ip::multicast::join_group its_join_option;
+ {
+ std::lock_guard<std::mutex> its_lock(local_mutex_);
+ if (is_v4_) {
+
+ its_join_option = boost::asio::ip::multicast::join_group(
+ _address.to_v4(),
+ local_.address().to_v4());
+ } else {
+ its_join_option = boost::asio::ip::multicast::join_group(
+ _address.to_v6(),
+ static_cast<unsigned int>(local_.address().to_v6().scope_id()));
+ }
+ }
+ multicast_socket_->set_option(its_join_option, ec);
+
+ if (!ec) {
+ std::lock_guard<std::mutex> its_guard(multicast_mutex_);
+ joined_[_address.to_string()] = false;
+ joined_group_ = true;
+ }
+ } else {
+ if (multicast_socket_) {
+ boost::asio::ip::multicast::leave_group its_leave_option(_address);
+ multicast_socket_->set_option(its_leave_option, ec);
+
+ if (!ec) {
+ std::lock_guard<std::mutex> its_guard(multicast_mutex_);
+ joined_.erase(_address.to_string());
+
+ if (0 == joined_.size()) {
+ joined_group_ = false;
+
+ multicast_socket_->cancel(ec);
+
+ multicast_socket_.reset(nullptr);
+ multicast_local_.reset(nullptr);
+ }
+ }
+ }
+ }
}
} // namespace vsomeip_v3