summaryrefslogtreecommitdiff
path: root/implementation/endpoints/src/udp_server_endpoint_impl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'implementation/endpoints/src/udp_server_endpoint_impl.cpp')
-rw-r--r--implementation/endpoints/src/udp_server_endpoint_impl.cpp492
1 files changed, 360 insertions, 132 deletions
diff --git a/implementation/endpoints/src/udp_server_endpoint_impl.cpp b/implementation/endpoints/src/udp_server_endpoint_impl.cpp
index 6ad7ce8..0a65b2e 100644
--- a/implementation/endpoints/src/udp_server_endpoint_impl.cpp
+++ b/implementation/endpoints/src/udp_server_endpoint_impl.cpp
@@ -9,70 +9,87 @@
#include <boost/asio/ip/multicast.hpp>
#include <vsomeip/constants.hpp>
+#include <vsomeip/internal/logger.hpp>
#include "../include/endpoint_definition.hpp"
#include "../include/endpoint_host.hpp"
+#include "../include/tp.hpp"
+#include "../../routing/include/routing_host.hpp"
#include "../include/udp_server_endpoint_impl.hpp"
#include "../../configuration/include/configuration.hpp"
-#include "../../logging/include/logger.hpp"
#include "../../utility/include/byteorder.hpp"
#include "../../utility/include/utility.hpp"
#include "../../service_discovery/include/defines.hpp"
namespace ip = boost::asio::ip;
-namespace vsomeip {
+namespace vsomeip_v3 {
udp_server_endpoint_impl::udp_server_endpoint_impl(
- std::shared_ptr< endpoint_host > _host,
- endpoint_type _local,
+ const std::shared_ptr<endpoint_host>& _endpoint_host,
+ const std::shared_ptr<routing_host>& _routing_host,
+ const endpoint_type& _local,
boost::asio::io_service &_io,
- configuration::endpoint_queue_limit_t _queue_limit,
- std::uint32_t _udp_receive_buffer_size)
- : server_endpoint_impl<ip::udp_ext>(
- _host, _local, _io, VSOMEIP_MAX_UDP_MESSAGE_SIZE, _queue_limit),
- socket_(_io, _local.protocol()),
+ const std::shared_ptr<configuration>& _configuration) :
+ server_endpoint_impl<ip::udp_ext>(_endpoint_host, _routing_host, _local,
+ _io, VSOMEIP_MAX_UDP_MESSAGE_SIZE,
+ _configuration->get_endpoint_queue_limit(_configuration->get_unicast_address().to_string(), _local.port()),
+ _configuration),
+ unicast_socket_(_io, _local.protocol()),
+ unicast_recv_buffer_(VSOMEIP_MAX_UDP_MESSAGE_SIZE, 0),
joined_group_(false),
- recv_buffer_(VSOMEIP_MAX_UDP_MESSAGE_SIZE, 0),
- local_port_(_local.port()) {
+ local_port_(_local.port()),
+ tp_reassembler_(std::make_shared<tp::tp_reassembler>(_configuration->get_max_message_size_unreliable(), _io)),
+ tp_cleanup_timer_(_io) {
+ is_supporting_someip_tp_ = true;
+
boost::system::error_code ec;
boost::asio::socket_base::reuse_address optionReuseAddress(true);
- socket_.set_option(optionReuseAddress, ec);
+ unicast_socket_.set_option(optionReuseAddress, ec);
boost::asio::detail::throw_error(ec, "reuse address");
- socket_.bind(_local, ec);
+#ifndef _WIN32
+ // If specified, bind to device
+ std::string its_device(configuration_->get_device());
+ if (its_device != "") {
+ if (setsockopt(unicast_socket_.native_handle(),
+ SOL_SOCKET, SO_BINDTODEVICE, its_device.c_str(), (int)its_device.size()) == -1) {
+ VSOMEIP_WARNING << "UDP Server: Could not bind to device \"" << its_device << "\"";
+ }
+ }
+#endif
+
+ unicast_socket_.bind(_local, ec);
boost::asio::detail::throw_error(ec, "bind");
- if (_local.address().is_v4()) {
- boost::asio::ip::address_v4 its_unicast_address
- = _host->get_configuration()->get_unicast_address().to_v4();
- boost::asio::ip::multicast::outbound_interface option(its_unicast_address);
- socket_.set_option(option, ec);
+ if (local_.address().is_v4()) {
+ boost::asio::ip::multicast::outbound_interface option(_local.address().to_v4());
+ unicast_socket_.set_option(option, ec);
boost::asio::detail::throw_error(ec, "outbound interface option IPv4");
- } else if (_local.address().is_v6()) {
- boost::asio::ip::address_v6 its_unicast_address
- = _host->get_configuration()->get_unicast_address().to_v6();
+ } else if (local_.address().is_v6()) {
boost::asio::ip::multicast::outbound_interface option(
- static_cast<unsigned int>(its_unicast_address.scope_id()));
- socket_.set_option(option, ec);
+ static_cast<unsigned int>(local_.address().to_v6().scope_id()));
+ unicast_socket_.set_option(option, ec);
boost::asio::detail::throw_error(ec, "outbound interface option IPv6");
}
boost::asio::socket_base::broadcast option(true);
- socket_.set_option(option, ec);
+ unicast_socket_.set_option(option, ec);
boost::asio::detail::throw_error(ec, "broadcast option");
+ const std::uint32_t its_udp_recv_buffer_size =
+ configuration_->get_udp_receive_buffer_size();
+ unicast_socket_.set_option(boost::asio::socket_base::receive_buffer_size(
+ its_udp_recv_buffer_size), ec);
- socket_.set_option(boost::asio::socket_base::receive_buffer_size(
- _udp_receive_buffer_size), ec);
if (ec) {
VSOMEIP_WARNING << "udp_server_endpoint_impl:: couldn't set "
<< "SO_RCVBUF: " << ec.message() << " to: " << std::dec
- << _udp_receive_buffer_size << " local port: " << std::dec
+ << its_udp_recv_buffer_size << " local port: " << std::dec
<< local_port_;
} else {
boost::asio::socket_base::receive_buffer_size its_option;
- socket_.get_option(its_option, ec);
+ unicast_socket_.get_option(its_option, ec);
if (ec) {
VSOMEIP_WARNING << "udp_server_endpoint_impl: couldn't get "
<< "SO_RCVBUF: " << ec.message() << " local port:"
@@ -83,14 +100,13 @@ udp_server_endpoint_impl::udp_server_endpoint_impl(
}
}
-
#ifdef _WIN32
const char* optval("0001");
- ::setsockopt(socket_.native(), IPPROTO_IP, IP_PKTINFO,
+ ::setsockopt(unicast_socket_.native_handle(), IPPROTO_IP, IP_PKTINFO,
optval, sizeof(optval));
#else
int optval(1);
- ::setsockopt(socket_.native(), IPPROTO_IP, IP_PKTINFO,
+ ::setsockopt(unicast_socket_.native_handle(), IPPROTO_IP, IP_PKTINFO,
&optval, sizeof(optval));
#endif
}
@@ -108,24 +124,63 @@ void udp_server_endpoint_impl::start() {
void udp_server_endpoint_impl::stop() {
server_endpoint_impl::stop();
+
{
- std::lock_guard<std::mutex> its_lock(socket_mutex_);
- if (socket_.is_open()) {
+ std::lock_guard<std::mutex> its_lock(unicast_socket_mutex_);
+
+ if (unicast_socket_.is_open()) {
boost::system::error_code its_error;
- socket_.shutdown(socket_type::shutdown_both, its_error);
- socket_.close(its_error);
+ unicast_socket_.shutdown(socket_type::shutdown_both, its_error);
+ unicast_socket_.close(its_error);
}
}
+
+ {
+ std::lock_guard<std::mutex> its_lock(multicast_socket_mutex_);
+
+ if (multicast_socket_ && multicast_socket_->is_open()) {
+ boost::system::error_code its_error;
+ multicast_socket_->shutdown(socket_type::shutdown_both, its_error);
+ multicast_socket_->close(its_error);
+ }
+ }
+
+ tp_reassembler_->stop();
}
void udp_server_endpoint_impl::receive() {
- std::lock_guard<std::mutex> its_lock(socket_mutex_);
- if(socket_.is_open()) {
- socket_.async_receive_from(
- boost::asio::buffer(&recv_buffer_[0], max_message_size_),
- remote_,
+ receive_unicast();
+ receive_multicast();
+}
+
+void udp_server_endpoint_impl::receive_unicast() {
+ std::lock_guard<std::mutex> its_lock(unicast_socket_mutex_);
+
+ if(unicast_socket_.is_open()) {
+ unicast_socket_.async_receive_from(
+ boost::asio::buffer(&unicast_recv_buffer_[0], max_message_size_),
+ unicast_remote_,
std::bind(
- &udp_server_endpoint_impl::receive_cbk,
+ &udp_server_endpoint_impl::on_unicast_received,
+ std::dynamic_pointer_cast<
+ udp_server_endpoint_impl >(shared_from_this()),
+ std::placeholders::_1,
+ std::placeholders::_2,
+ std::placeholders::_3
+ )
+ );
+ }
+}
+
+void udp_server_endpoint_impl::receive_multicast() {
+ std::lock_guard<std::mutex> its_lock(multicast_socket_mutex_);
+
+ if (multicast_socket_ && multicast_socket_->is_open()) {
+ multicast_socket_->async_receive_from(
+ boost::asio::buffer(&(*multicast_recv_buffer_)[0], max_message_size_),
+ multicast_remote_,
+ std::bind(
+ &udp_server_endpoint_impl::on_multicast_received,
std::dynamic_pointer_cast<
udp_server_endpoint_impl >(shared_from_this()),
std::placeholders::_1,
@@ -138,14 +193,39 @@ void udp_server_endpoint_impl::receive() {
bool udp_server_endpoint_impl::send_to(
const std::shared_ptr<endpoint_definition> _target,
- const byte_t *_data, uint32_t _size, bool _flush) {
+ const byte_t *_data, uint32_t _size) {
std::lock_guard<std::mutex> its_lock(mutex_);
endpoint_type its_target(_target->get_address(), _target->get_port());
- return send_intern(its_target, _data, _size, _flush);
+ return send_intern(its_target, _data, _size);
+}
+
+bool udp_server_endpoint_impl::send_error(
+ const std::shared_ptr<endpoint_definition> _target,
+ const byte_t *_data, uint32_t _size) {
+ bool ret(false);
+ std::lock_guard<std::mutex> its_lock(mutex_);
+ const endpoint_type its_target(_target->get_address(), _target->get_port());
+ const queue_iterator_type target_queue_iterator(find_or_create_queue_unlocked(its_target));
+ auto& its_qpair = target_queue_iterator->second;
+ const bool queue_size_zero_on_entry(its_qpair.second.empty());
+
+ if (check_message_size(nullptr, _size, its_target) == endpoint_impl::cms_ret_e::MSG_OK &&
+ check_queue_limit(_data, _size, its_qpair.first)) {
+ its_qpair.second.emplace_back(
+ std::make_shared<message_buffer_t>(_data, _data + _size));
+ its_qpair.first += _size;
+
+ if (queue_size_zero_on_entry) { // no writing in progress
+ send_queued(target_queue_iterator);
+ }
+ ret = true;
+ }
+ return ret;
}
void udp_server_endpoint_impl::send_queued(
const queue_iterator_type _queue_iterator) {
+
message_buffer_ptr_t its_buffer = _queue_iterator->second.second.front();
#if 0
std::stringstream msg;
@@ -156,9 +236,9 @@ void udp_server_endpoint_impl::send_queued(
<< (int)(*its_buffer)[i] << " ";
VSOMEIP_INFO << msg.str();
#endif
- {
- std::lock_guard<std::mutex> its_lock(socket_mutex_);
- socket_.async_send_to(
+ std::lock_guard<std::mutex> its_lock(unicast_socket_mutex_);
+
+ unicast_socket_.async_send_to(
boost::asio::buffer(*its_buffer),
_queue_iterator->first,
std::bind(
@@ -169,7 +249,16 @@ void udp_server_endpoint_impl::send_queued(
std::placeholders::_2
)
);
- }
+}
+
+void udp_server_endpoint_impl::get_configured_times_from_endpoint(
+ service_t _service, method_t _method,
+ std::chrono::nanoseconds *_debouncing,
+ std::chrono::nanoseconds *_maximum_retention) const {
+ configuration_->get_configured_timing_responses(_service,
+ udp_server_endpoint_base_impl::local_.address().to_string(),
+ udp_server_endpoint_base_impl::local_.port(), _method,
+ _debouncing, _maximum_retention);
}
bool udp_server_endpoint_impl::is_joined(const std::string &_address) const {
@@ -193,6 +282,70 @@ void udp_server_endpoint_impl::join(const std::string &_address) {
auto join_func = [this](const std::string &_address) {
try {
+ VSOMEIP_TRACE << "Joining to multicast group " << _address
+ << " from " << local_.address().to_string();
+
+ boost::system::error_code ec;
+
+ if (!multicast_recv_buffer_) {
+ multicast_recv_buffer_ = std::unique_ptr<message_buffer_t>(
+ new message_buffer_t(VSOMEIP_MAX_UDP_MESSAGE_SIZE, 0));
+ }
+
+ if (!multicast_ep_) {
+ multicast_ep_ = std::unique_ptr<endpoint_type>(
+ new endpoint_type(boost::asio::ip::address_v4::any(), local_port_));
+ }
+
+ if (!multicast_socket_) {
+ multicast_socket_ = std::unique_ptr<socket_type>(
+ new socket_type(service_, local_.protocol()));
+
+ boost::asio::socket_base::reuse_address optionReuseAddress(true);
+ multicast_socket_->set_option(optionReuseAddress, ec);
+ boost::asio::detail::throw_error(ec, "reuse address in multicast");
+
+ multicast_socket_->bind(*multicast_ep_, ec);
+ boost::asio::detail::throw_error(ec, "bind multicast");
+
+ const std::uint32_t its_udp_recv_buffer_size =
+ configuration_->get_udp_receive_buffer_size();
+
+ multicast_socket_->set_option(boost::asio::socket_base::receive_buffer_size(
+ its_udp_recv_buffer_size), ec);
+
+ if (ec) {
+ VSOMEIP_WARNING << "udp_server_endpoint_impl:: couldn't set "
+ << "SO_RCVBUF: " << ec.message() << " to: " << std::dec
+ << its_udp_recv_buffer_size << " local port: " << std::dec
+ << local_port_;
+ } else {
+ boost::asio::socket_base::receive_buffer_size its_option;
+ multicast_socket_->get_option(its_option, ec);
+
+ if (ec) {
+ VSOMEIP_WARNING << "udp_server_endpoint_impl: couldn't get "
+ << "SO_RCVBUF: " << ec.message() << " local port:"
+ << std::dec << local_port_;
+ } else {
+ VSOMEIP_INFO << "udp_server_endpoint_impl: SO_RCVBUF (Multicast) is: "
+ << std::dec << its_option.value();
+ }
+ }
+
+#ifdef _WIN32
+ const char* optval("0001");
+ ::setsockopt(multicast_socket_->native_handle(), IPPROTO_IP, IP_PKTINFO,
+ optval, sizeof(optval));
+#else
+ int optval(1);
+ ::setsockopt(multicast_socket_->native_handle(), IPPROTO_IP, IP_PKTINFO,
+ &optval, sizeof(optval));
+#endif
+
+ receive_multicast();
+ }
+
bool is_v4(false);
bool is_v6(false);
{
@@ -200,30 +353,31 @@ void udp_server_endpoint_impl::join(const std::string &_address) {
is_v4 = local_.address().is_v4();
is_v6 = local_.address().is_v6();
}
+
if (is_v4) {
- std::lock_guard<std::mutex> its_lock(socket_mutex_);
- socket_.set_option(ip::udp_ext::socket::reuse_address(true));
- socket_.set_option(
+ std::lock_guard<std::mutex> its_lock(multicast_socket_mutex_);
+ multicast_socket_->set_option(ip::udp_ext::socket::reuse_address(true));
+ multicast_socket_->set_option(
boost::asio::ip::multicast::enable_loopback(false));
#ifdef _WIN32
- socket_.set_option(boost::asio::ip::multicast::join_group(
+ multicast_socket_->set_option(boost::asio::ip::multicast::join_group(
boost::asio::ip::address::from_string(_address).to_v4(),
local_.address().to_v4()));
#else
- socket_.set_option(boost::asio::ip::multicast::join_group(
+ multicast_socket_->set_option(boost::asio::ip::multicast::join_group(
boost::asio::ip::address::from_string(_address).to_v4()));
#endif
} else if (is_v6) {
- std::lock_guard<std::mutex> its_lock(socket_mutex_);
- socket_.set_option(ip::udp_ext::socket::reuse_address(true));
- socket_.set_option(
+ std::lock_guard<std::mutex> its_lock(multicast_socket_mutex_);
+ multicast_socket_->set_option(ip::udp_ext::socket::reuse_address(true));
+ multicast_socket_->set_option(
boost::asio::ip::multicast::enable_loopback(false));
#ifdef _WIN32
- socket_.set_option(boost::asio::ip::multicast::join_group(
+ multicast_socket_->set_option(boost::asio::ip::multicast::join_group(
boost::asio::ip::address::from_string(_address).to_v6(),
local_.address().to_v6().scope_id()));
#else
- socket_.set_option(boost::asio::ip::multicast::join_group(
+ multicast_socket_->set_option(boost::asio::ip::multicast::join_group(
boost::asio::ip::address::from_string(_address).to_v6()));
#endif
}
@@ -249,6 +403,9 @@ void udp_server_endpoint_impl::join(const std::string &_address) {
void udp_server_endpoint_impl::leave(const std::string &_address) {
try {
if (is_joined(_address)) {
+ VSOMEIP_TRACE << "Leaving the multicast group " << _address
+ << " from " << local_.address().to_string();
+
bool is_v4(false);
bool is_v6(false);
{
@@ -257,12 +414,12 @@ void udp_server_endpoint_impl::leave(const std::string &_address) {
is_v6 = local_.address().is_v6();
}
if (is_v4) {
- std::lock_guard<std::mutex> its_lock(socket_mutex_);
- socket_.set_option(boost::asio::ip::multicast::leave_group(
+ std::lock_guard<std::mutex> its_lock(multicast_socket_mutex_);
+ multicast_socket_->set_option(boost::asio::ip::multicast::leave_group(
boost::asio::ip::address::from_string(_address)));
} else if (is_v6) {
- std::lock_guard<std::mutex> its_lock(socket_mutex_);
- socket_.set_option(boost::asio::ip::multicast::leave_group(
+ std::lock_guard<std::mutex> its_lock(multicast_socket_mutex_);
+ multicast_socket_->set_option(boost::asio::ip::multicast::leave_group(
boost::asio::ip::address::from_string(_address)));
}
{
@@ -270,6 +427,10 @@ void udp_server_endpoint_impl::leave(const std::string &_address) {
joined_.erase(_address);
if (!joined_.size()) {
joined_group_ = false;
+
+ multicast_socket_.reset(nullptr);
+ multicast_ep_.reset(nullptr);
+ multicast_recv_buffer_.reset(nullptr);
}
}
}
@@ -308,10 +469,32 @@ std::uint16_t udp_server_endpoint_impl::get_local_port() const {
return local_port_;
}
-// TODO: find a better way to structure the receive functions
-void udp_server_endpoint_impl::receive_cbk(
- boost::system::error_code const &_error, std::size_t _bytes,
+void udp_server_endpoint_impl::on_unicast_received(
+ boost::system::error_code const &_error,
+ std::size_t _bytes,
+ boost::asio::ip::address const &_destination) {
+ on_message_received(_error, _bytes, _destination, unicast_remote_, unicast_recv_buffer_);
+ receive_unicast();
+}
+
+void udp_server_endpoint_impl::on_multicast_received(
+ boost::system::error_code const &_error,
+ std::size_t _bytes,
boost::asio::ip::address const &_destination) {
+
+ // Filter messages sent from the same source address
+ if (multicast_remote_.address() != local_.address()) {
+ on_message_received(_error, _bytes, _destination, multicast_remote_, *multicast_recv_buffer_);
+ }
+
+ receive_multicast();
+}
+
+void udp_server_endpoint_impl::on_message_received(
+ boost::system::error_code const &_error, std::size_t _bytes,
+ boost::asio::ip::address const &_destination,
+ endpoint_type const &_remote,
+ message_buffer_t const &_buffer) {
#if 0
std::stringstream msg;
msg << "usei::rcb(" << _error.message() << "): ";
@@ -320,16 +503,17 @@ void udp_server_endpoint_impl::receive_cbk(
<< (int) recv_buffer_[i] << " ";
VSOMEIP_INFO << msg.str();
#endif
- std::shared_ptr<endpoint_host> its_host = this->host_.lock();
+ std::shared_ptr<routing_host> its_host = routing_host_.lock();
+
if (its_host) {
if (!_error && 0 < _bytes) {
std::size_t remaining_bytes = _bytes;
std::size_t i = 0;
- const boost::asio::ip::address its_remote_address(remote_.address());
- const std::uint16_t its_remote_port(remote_.port());
+ const boost::asio::ip::address its_remote_address(_remote.address());
+ const std::uint16_t its_remote_port(_remote.port());
do {
uint64_t read_message_size
- = utility::get_message_size(&this->recv_buffer_[i],
+ = utility::get_message_size(&_buffer[i],
remaining_bytes);
if (read_message_size > MESSAGE_SIZE_UNLIMITED) {
VSOMEIP_ERROR << "Message size exceeds allowed maximum!";
@@ -342,62 +526,58 @@ void udp_server_endpoint_impl::receive_cbk(
VSOMEIP_ERROR << "buffer underflow in udp client endpoint ~> abort!";
return;
} else if (current_message_size > VSOMEIP_RETURN_CODE_POS &&
- (recv_buffer_[i + VSOMEIP_PROTOCOL_VERSION_POS] != VSOMEIP_PROTOCOL_VERSION ||
- !utility::is_valid_message_type(static_cast<message_type_e>(recv_buffer_[i + VSOMEIP_MESSAGE_TYPE_POS])) ||
- !utility::is_valid_return_code(static_cast<return_code_e>(recv_buffer_[i + VSOMEIP_RETURN_CODE_POS]))
+ (_buffer[i + VSOMEIP_PROTOCOL_VERSION_POS] != VSOMEIP_PROTOCOL_VERSION ||
+ !utility::is_valid_message_type(tp::tp::tp_flag_unset(_buffer[i + VSOMEIP_MESSAGE_TYPE_POS])) ||
+ !utility::is_valid_return_code(static_cast<return_code_e>(_buffer[i + VSOMEIP_RETURN_CODE_POS]))
)) {
- if (recv_buffer_[i + VSOMEIP_PROTOCOL_VERSION_POS] != VSOMEIP_PROTOCOL_VERSION) {
+ if (_buffer[i + VSOMEIP_PROTOCOL_VERSION_POS] != VSOMEIP_PROTOCOL_VERSION) {
VSOMEIP_ERROR << "use: Wrong protocol version: 0x"
<< std::hex << std::setw(2) << std::setfill('0')
- << std::uint32_t(recv_buffer_[i + VSOMEIP_PROTOCOL_VERSION_POS])
+ << std::uint32_t(_buffer[i + VSOMEIP_PROTOCOL_VERSION_POS])
<< " local: " << get_address_port_local()
<< " remote: " << its_remote_address << ":" << std::dec << its_remote_port;
// ensure to send back a message w/ wrong protocol version
- its_host->on_message(&recv_buffer_[i],
+ its_host->on_message(&_buffer[i],
VSOMEIP_SOMEIP_HEADER_SIZE + 8, this,
_destination,
VSOMEIP_ROUTING_CLIENT,
- its_remote_address,
- its_remote_port);
- } else if (!utility::is_valid_message_type(static_cast<message_type_e>(
- recv_buffer_[i + VSOMEIP_MESSAGE_TYPE_POS]))) {
+ std::make_pair(ANY_UID, ANY_GID),
+ its_remote_address, its_remote_port);
+ } else if (!utility::is_valid_message_type(tp::tp::tp_flag_unset(
+ _buffer[i + VSOMEIP_MESSAGE_TYPE_POS]))) {
VSOMEIP_ERROR << "use: Invalid message type: 0x"
<< std::hex << std::setw(2) << std::setfill('0')
- << std::uint32_t(recv_buffer_[i + VSOMEIP_MESSAGE_TYPE_POS])
+ << std::uint32_t(_buffer[i + VSOMEIP_MESSAGE_TYPE_POS])
<< " local: " << get_address_port_local()
<< " remote: " << its_remote_address << ":" << std::dec << its_remote_port;
} else if (!utility::is_valid_return_code(static_cast<return_code_e>(
- recv_buffer_[i + VSOMEIP_RETURN_CODE_POS]))) {
+ _buffer[i + VSOMEIP_RETURN_CODE_POS]))) {
VSOMEIP_ERROR << "use: Invalid return code: 0x"
<< std::hex << std::setw(2) << std::setfill('0')
- << std::uint32_t(recv_buffer_[i + VSOMEIP_RETURN_CODE_POS])
+ << std::uint32_t(_buffer[i + VSOMEIP_RETURN_CODE_POS])
<< " local: " << get_address_port_local()
<< " remote: " << its_remote_address << ":" << std::dec << its_remote_port;
}
- receive();
return;
}
remaining_bytes -= current_message_size;
- service_t its_service = VSOMEIP_BYTES_TO_WORD(recv_buffer_[i + VSOMEIP_SERVICE_POS_MIN],
- recv_buffer_[i + VSOMEIP_SERVICE_POS_MAX]);
+ const service_t its_service = VSOMEIP_BYTES_TO_WORD(_buffer[i + VSOMEIP_SERVICE_POS_MIN],
+ _buffer[i + VSOMEIP_SERVICE_POS_MAX]);
if (utility::is_request(
- recv_buffer_[i + VSOMEIP_MESSAGE_TYPE_POS])) {
- client_t its_client;
- std::memcpy(&its_client,
- &recv_buffer_[i + VSOMEIP_CLIENT_POS_MIN],
- sizeof(client_t));
- if (its_client != MAGIC_COOKIE_NETWORK_BYTE_ORDER) {
- session_t its_session;
- std::memcpy(&its_session,
- &recv_buffer_[i + VSOMEIP_SESSION_POS_MIN],
- sizeof(session_t));
+ _buffer[i + VSOMEIP_MESSAGE_TYPE_POS])) {
+ const client_t its_client = VSOMEIP_BYTES_TO_WORD(
+ _buffer[i + VSOMEIP_CLIENT_POS_MIN],
+ _buffer[i + VSOMEIP_CLIENT_POS_MAX]);
+ if (its_client != MAGIC_COOKIE_CLIENT) {
+ const session_t its_session = VSOMEIP_BYTES_TO_WORD(
+ _buffer[i + VSOMEIP_SESSION_POS_MIN],
+ _buffer[i + VSOMEIP_SESSION_POS_MAX]);
clients_mutex_.lock();
- clients_[its_client][its_session] = remote_;
- endpoint_to_client_[remote_] = its_client;
+ clients_[its_client][its_session] = _remote;
clients_mutex_.unlock();
}
} else if (its_service != VSOMEIP_SD_SERVICE
- && utility::is_notification(recv_buffer_[i + VSOMEIP_MESSAGE_TYPE_POS])
+ && utility::is_notification(_buffer[i + VSOMEIP_MESSAGE_TYPE_POS])
&& joined_group_) {
std::lock_guard<std::mutex> its_lock(joined_mutex_);
boost::system::error_code ec;
@@ -406,61 +586,91 @@ void udp_server_endpoint_impl::receive_cbk(
found_address->second = true;
}
}
- if (its_service != VSOMEIP_SD_SERVICE ||
- (current_message_size > VSOMEIP_SOMEIP_HEADER_SIZE &&
- current_message_size >= remaining_bytes)) {
- its_host->on_message(&recv_buffer_[i],
- current_message_size, this, _destination,
- VSOMEIP_ROUTING_CLIENT, its_remote_address,
- its_remote_port);
+ if (tp::tp::tp_flag_is_set(_buffer[i + VSOMEIP_MESSAGE_TYPE_POS])) {
+ const auto res = tp_reassembler_->process_tp_message(
+ &_buffer[i], current_message_size,
+ its_remote_address, its_remote_port);
+ if (res.first) {
+ if (utility::is_request(res.second[VSOMEIP_MESSAGE_TYPE_POS])) {
+ const client_t its_client = VSOMEIP_BYTES_TO_WORD(
+ res.second[VSOMEIP_CLIENT_POS_MIN],
+ res.second[VSOMEIP_CLIENT_POS_MAX]);
+ if (its_client != MAGIC_COOKIE_CLIENT) {
+ const session_t its_session = VSOMEIP_BYTES_TO_WORD(
+ res.second[VSOMEIP_SESSION_POS_MIN],
+ res.second[VSOMEIP_SESSION_POS_MAX]);
+ clients_mutex_.lock();
+ clients_[its_client][its_session] = _remote;
+ clients_mutex_.unlock();
+ }
+ } else if (its_service != VSOMEIP_SD_SERVICE
+ && utility::is_notification(res.second[VSOMEIP_MESSAGE_TYPE_POS])
+ && joined_group_) {
+ std::lock_guard<std::mutex> its_lock(joined_mutex_);
+ boost::system::error_code ec;
+ const auto found_address = joined_.find(_destination.to_string(ec));
+ if (found_address != joined_.end()) {
+ found_address->second = true;
+ }
+ }
+ its_host->on_message(&res.second[0],
+ static_cast<std::uint32_t>(res.second.size()),
+ this, _destination, VSOMEIP_ROUTING_CLIENT,
+ std::make_pair(ANY_UID, ANY_GID),
+ its_remote_address, its_remote_port);
+ }
} else {
- //ignore messages for service discovery with shorter SomeIP length
- VSOMEIP_ERROR << "Received an unreliable vSomeIP SD message with too short length field";
+ if (its_service != VSOMEIP_SD_SERVICE ||
+ (current_message_size > VSOMEIP_SOMEIP_HEADER_SIZE &&
+ current_message_size >= remaining_bytes)) {
+ its_host->on_message(&_buffer[i],
+ current_message_size, this, _destination,
+ VSOMEIP_ROUTING_CLIENT,
+ std::make_pair(ANY_UID, ANY_GID),
+ its_remote_address, its_remote_port);
+ } else {
+ //ignore messages for service discovery with shorter SomeIP length
+ VSOMEIP_ERROR << "Received an unreliable vSomeIP SD message with too short length field"
+ << " local: " << get_address_port_local()
+ << " remote: " << its_remote_address << ":" << std::dec << its_remote_port;
+ }
}
i += current_message_size;
} else {
- VSOMEIP_ERROR << "Received an unreliable vSomeIP message with bad length field";
+ VSOMEIP_ERROR << "Received an unreliable vSomeIP message with bad length field"
+ << " local: " << get_address_port_local()
+ << " remote: " << its_remote_address << ":" << std::dec << its_remote_port;
if (remaining_bytes > VSOMEIP_SERVICE_POS_MAX) {
- service_t its_service = VSOMEIP_BYTES_TO_WORD(recv_buffer_[VSOMEIP_SERVICE_POS_MIN],
- recv_buffer_[VSOMEIP_SERVICE_POS_MAX]);
+ service_t its_service = VSOMEIP_BYTES_TO_WORD(_buffer[VSOMEIP_SERVICE_POS_MIN],
+ _buffer[VSOMEIP_SERVICE_POS_MAX]);
if (its_service != VSOMEIP_SD_SERVICE) {
if (read_message_size == 0) {
VSOMEIP_ERROR << "Ignoring unreliable vSomeIP message with SomeIP message length 0!";
} else {
- its_host->on_error(&recv_buffer_[i],
- (uint32_t)remaining_bytes, this,
- its_remote_address, its_remote_port);
+ auto its_endpoint_host = endpoint_host_.lock();
+ if (its_endpoint_host) {
+ its_endpoint_host->on_error(&_buffer[i],
+ (uint32_t)remaining_bytes, this,
+ its_remote_address, its_remote_port);
+ }
}
}
}
remaining_bytes = 0;
}
} while (remaining_bytes > 0);
- receive();
- } else {
- receive();
}
}
}
-client_t udp_server_endpoint_impl::get_client(std::shared_ptr<endpoint_definition> _endpoint) {
- const endpoint_type endpoint(_endpoint->get_address(), _endpoint->get_port());
- std::lock_guard<std::mutex> its_lock(clients_mutex_);
- auto found_endpoint = endpoint_to_client_.find(endpoint);
- if (found_endpoint != endpoint_to_client_.end()) {
- // TODO: Check system byte order before convert!
- const client_t client = client_t(found_endpoint->second << 8 | found_endpoint->second >> 8);
- return client;
- }
- return 0;
-}
-
void udp_server_endpoint_impl::print_status() {
std::lock_guard<std::mutex> its_lock(mutex_);
VSOMEIP_INFO << "status use: " << std::dec << local_port_
<< " number queues: " << std::dec << queues_.size()
- << " recv_buffer: " << std::dec << recv_buffer_.capacity();
+ << " recv_buffer: " << std::dec << unicast_recv_buffer_.capacity()
+ << " multicast_recv_buffer: " << std::dec
+ << (multicast_recv_buffer_ ? multicast_recv_buffer_->capacity() : 0);
for (const auto &c : queues_) {
std::size_t its_data_size(0);
@@ -484,13 +694,24 @@ std::string udp_server_endpoint_impl::get_remote_information(
+ std::to_string(_queue_iterator->first.port());
}
+std::string udp_server_endpoint_impl::get_remote_information(
+ const endpoint_type& _remote) const {
+ boost::system::error_code ec;
+ return _remote.address().to_string(ec) + ":"
+ + std::to_string(_remote.port());
+}
+
+bool udp_server_endpoint_impl::is_reliable() const {
+ return false;
+}
+
const std::string udp_server_endpoint_impl::get_address_port_local() const {
- std::lock_guard<std::mutex> its_lock(socket_mutex_);
+ std::lock_guard<std::mutex> its_lock(unicast_socket_mutex_);
std::string its_address_port;
its_address_port.reserve(21);
boost::system::error_code ec;
- if (socket_.is_open()) {
- endpoint_type its_local_endpoint = socket_.local_endpoint(ec);
+ if (unicast_socket_.is_open()) {
+ endpoint_type its_local_endpoint = unicast_socket_.local_endpoint(ec);
if (!ec) {
its_address_port += its_local_endpoint.address().to_string(ec);
its_address_port += ":";
@@ -500,4 +721,11 @@ const std::string udp_server_endpoint_impl::get_address_port_local() const {
return its_address_port;
}
-} // namespace vsomeip
+bool udp_server_endpoint_impl::tp_segmentation_enabled(service_t _service,
+ method_t _method) const {
+ return configuration_->tp_segment_messages_service_to_client(_service,
+ local_.address().to_string(),
+ local_.port(), _method);
+}
+
+} // namespace vsomeip_v3