diff options
Diffstat (limited to 'implementation/endpoints/src')
11 files changed, 706 insertions, 171 deletions
diff --git a/implementation/endpoints/src/client_endpoint_impl.cpp b/implementation/endpoints/src/client_endpoint_impl.cpp index b4a6bf9..2f521ea 100644 --- a/implementation/endpoints/src/client_endpoint_impl.cpp +++ b/implementation/endpoints/src/client_endpoint_impl.cpp @@ -38,10 +38,12 @@ client_endpoint_impl<Protocol>::client_endpoint_impl( flush_timer_(_io), connect_timer_(_io),
connect_timeout_(VSOMEIP_DEFAULT_CONNECT_TIMEOUT), // TODO: use config variable
state_(cei_state_e::CLOSED),
+ reconnect_counter_(0),
packetizer_(std::make_shared<message_buffer_t>()),
queue_size_(0),
was_not_connected_(false),
- local_port_(0) {
+ local_port_(0), + strand_(_io) { }
template<typename Protocol>
@@ -54,22 +56,38 @@ bool client_endpoint_impl<Protocol>::is_client() const { }
template<typename Protocol>
-bool client_endpoint_impl<Protocol>::is_connected() const {
+bool client_endpoint_impl<Protocol>::is_established() const {
return state_ == cei_state_e::ESTABLISHED;
}
template<typename Protocol>
+void client_endpoint_impl<Protocol>::set_established(bool _established) {
+ if (_established) {
+ if (state_ != cei_state_e::CONNECTING) {
+ std::lock_guard<std::mutex> its_lock(socket_mutex_);
+ if (socket_->is_open()) {
+ state_ = cei_state_e::ESTABLISHED;
+ } else {
+ state_ = cei_state_e::CLOSED;
+ }
+ }
+ } else {
+ state_ = cei_state_e::CLOSED;
+ }
}
+template<typename Protocol>
void client_endpoint_impl<Protocol>::set_connected(bool _connected) {
if (_connected) {
std::lock_guard<std::mutex> its_lock(socket_mutex_);
if (socket_->is_open()) {
- state_ = cei_state_e::ESTABLISHED;
+ state_ = cei_state_e::CONNECTED;
} else {
state_ = cei_state_e::CLOSED;
}
} else {
state_ = cei_state_e::CLOSED;
- }
}
+ }
+}
+
template<typename Protocol>
void client_endpoint_impl<Protocol>::stop() {
{
std::lock_guard<std::mutex> its_lock(mutex_);
@@ -159,7 +177,8 @@ bool client_endpoint_impl<Protocol>::flush() { template<typename Protocol>
void client_endpoint_impl<Protocol>::connect_cbk(
boost::system::error_code const &_error) {
- if (_error == boost::asio::error::operation_aborted) {
+ if (_error == boost::asio::error::operation_aborted
+ || endpoint_impl<Protocol>::sending_blocked_) {
// endpoint was stopped
shutdown_and_close_socket(false);
return;
@@ -173,7 +192,12 @@ void client_endpoint_impl<Protocol>::connect_cbk( state_ = cei_state_e::CLOSED;
its_host->on_disconnect(this->shared_from_this());
}
- start_connect_timer();
+ if (get_max_allowed_reconnects() == MAX_RECONNECTS_UNLIMITED ||
+ get_max_allowed_reconnects() >= ++reconnect_counter_) {
+ start_connect_timer();
+ } else {
+ max_allowed_reconnects_reached();
+ }
// Double the timeout as long as the maximum allowed is larger
if (connect_timeout_ < VSOMEIP_MAX_CONNECT_TIMEOUT)
connect_timeout_ = (connect_timeout_ << 1);
@@ -183,13 +207,8 @@ void client_endpoint_impl<Protocol>::connect_cbk( connect_timer_.cancel();
}
connect_timeout_ = VSOMEIP_DEFAULT_CONNECT_TIMEOUT; // TODO: use config variable
+ reconnect_counter_ = 0;
set_local_port();
- if (state_ != cei_state_e::ESTABLISHED) {
- its_host->on_connect(this->shared_from_this());
- }
-
- receive();
-
if (was_not_connected_) {
was_not_connected_ = false;
std::lock_guard<std::mutex> its_lock(mutex_);
@@ -199,6 +218,10 @@ void client_endpoint_impl<Protocol>::connect_cbk( << get_remote_information();
}
}
+ if (state_ != cei_state_e::ESTABLISHED) {
+ its_host->on_connect(this->shared_from_this());
+ }
+ receive();
}
}
}
@@ -269,8 +292,17 @@ void client_endpoint_impl<Protocol>::send_cbk( shutdown_and_close_socket(true);
connect();
} else if (_error == boost::asio::error::not_connected
- || _error == boost::asio::error::bad_descriptor) {
+ || _error == boost::asio::error::bad_descriptor
+ || _error == boost::asio::error::no_permission) {
state_ = cei_state_e::CLOSED;
+ if (_error == boost::asio::error::no_permission) {
+ VSOMEIP_WARNING << "cei::send_cbk received error: " << _error.message()
+ << " (" << std::dec << _error.value() << ") "
+ << get_remote_information();
+ std::lock_guard<std::mutex> its_lock(mutex_);
+ queue_.clear();
+ queue_size_ = 0;
+ }
was_not_connected_ = true;
shutdown_and_close_socket(true);
connect();
@@ -332,6 +364,12 @@ template<typename Protocol> void client_endpoint_impl<Protocol>::shutdown_and_close_socket_unlocked(bool _recreate_socket) {
local_port_ = 0;
if (socket_->is_open()) {
+#ifndef _WIN32 + if (-1 == fcntl(socket_->native_handle(), F_GETFD)) { + VSOMEIP_ERROR << "cei::shutdown_and_close_socket_unlocked: socket/handle closed already '" << std::string(std::strerror(errno)) + << "' (" << errno << ") " << get_remote_information(); + } +#endif boost::system::error_code its_error;
socket_->shutdown(Protocol::socket::shutdown_both, its_error);
socket_->close(its_error);
diff --git a/implementation/endpoints/src/credentials.cpp b/implementation/endpoints/src/credentials.cpp index 84f7f08..411fd3f 100644 --- a/implementation/endpoints/src/credentials.cpp +++ b/implementation/endpoints/src/credentials.cpp @@ -17,14 +17,16 @@ namespace vsomeip { void credentials::activate_credentials(const int _fd) { int optval = 1; if (setsockopt(_fd, SOL_SOCKET, SO_PASSCRED, &optval, sizeof(optval)) == -1) { - VSOMEIP_ERROR << "Activating socket option for receiving credentials failed."; + VSOMEIP_ERROR << "vSomeIP Security: Activating socket option for receiving " + << "credentials failed."; } } void credentials::deactivate_credentials(const int _fd) { int optval = 0; if (setsockopt(_fd, SOL_SOCKET, SO_PASSCRED, &optval, sizeof(optval)) == -1) { - VSOMEIP_ERROR << "Deactivating socket option for receiving credentials failed."; + VSOMEIP_ERROR << "vSomeIP Security: Deactivating socket option for receiving " + << "credentials failed."; } } @@ -62,13 +64,13 @@ client_t credentials::receive_credentials(const int _fd, uid_t& _uid, gid_t& _gi // Receive client_id plus ancillary data ssize_t nr = recvmsg(_fd, &msgh, 0); if (nr == -1) { - VSOMEIP_ERROR << "Receiving credentials failed. No data."; + VSOMEIP_ERROR << "vSomeIP Security: Receiving credentials failed. No data."; } cmhp = CMSG_FIRSTHDR(&msgh); if (cmhp == NULL || cmhp->cmsg_len != CMSG_LEN(sizeof(struct ucred)) || cmhp->cmsg_level != SOL_SOCKET || cmhp->cmsg_type != SCM_CREDENTIALS) { - VSOMEIP_ERROR << "Receiving credentials failed. Invalid data."; + VSOMEIP_ERROR << "vSomeIP Security: Receiving credentials failed. Invalid data."; } else { ucredp = (struct ucred *) CMSG_DATA(cmhp); _uid = ucredp->uid; diff --git a/implementation/endpoints/src/local_client_endpoint_impl.cpp b/implementation/endpoints/src/local_client_endpoint_impl.cpp index 29d861f..a57c831 100644 --- a/implementation/endpoints/src/local_client_endpoint_impl.cpp +++ b/implementation/endpoints/src/local_client_endpoint_impl.cpp @@ -60,6 +60,8 @@ void local_client_endpoint_impl::restart(bool _force) { std::lock_guard<std::mutex> its_lock(socket_mutex_);
shutdown_and_close_socket_unlocked(true);
}
+ was_not_connected_ = true;
+ reconnect_counter_ = 0;
start_connect_timer();
}
@@ -139,13 +141,14 @@ void local_client_endpoint_impl::connect() { } else {
VSOMEIP_WARNING << "local_client_endpoint::connect: Error opening socket: "
- << its_error.message();
- return;
+ << its_error.message() << " (" << std::dec << its_error.value()
+ << ")";
+ its_connect_error = its_error;
}
}
// call connect_cbk asynchronously
try {
- service_.post(
+ strand_.post( std::bind(&client_endpoint_impl::connect_cbk, shared_from_this(),
its_connect_error));
} catch (const std::exception &e) {
@@ -158,13 +161,15 @@ void local_client_endpoint_impl::receive() { if (socket_->is_open()) {
socket_->async_receive(
boost::asio::buffer(recv_buffer_),
- std::bind(
- &local_client_endpoint_impl::receive_cbk,
- std::dynamic_pointer_cast<
- local_client_endpoint_impl
- >(shared_from_this()),
- std::placeholders::_1,
- std::placeholders::_2
+ strand_.wrap( + std::bind( + &local_client_endpoint_impl::receive_cbk, + std::dynamic_pointer_cast< + local_client_endpoint_impl + >(shared_from_this()), + std::placeholders::_1, + std::placeholders::_2 + ) )
);
}
@@ -232,10 +237,6 @@ void local_client_endpoint_impl::receive_cbk( VSOMEIP_ERROR << "Local endpoint received message ("
<< _error.message() << ")";
}
- // The error handler is set only if the endpoint is hosted by the
- // routing manager. For the routing manager proxies, the corresponding
- // client endpoint (that connect to the same client) are removed
- // after the proxy has received the routing info.
error_handler_t handler;
{
std::lock_guard<std::mutex> its_lock(error_handler_mutex_);
@@ -290,6 +291,10 @@ std::string local_client_endpoint_impl::get_remote_information() const { #endif
}
+std::uint32_t local_client_endpoint_impl::get_max_allowed_reconnects() const {
+ return 13;
+}
+
bool local_client_endpoint_impl::send(const std::vector<byte_t>& _cmd_header,
const byte_t *_data, uint32_t _size,
bool _flush) {
@@ -319,4 +324,16 @@ bool local_client_endpoint_impl::send(const std::vector<byte_t>& _cmd_header, return ret;
}
+void local_client_endpoint_impl::max_allowed_reconnects_reached() {
+ VSOMEIP_ERROR << "local_client_endpoint::max_allowed_reconnects_reached: "
+ << get_remote_information();
+ error_handler_t handler;
+ {
+ std::lock_guard<std::mutex> its_lock(error_handler_mutex_);
+ handler = error_handler_;
+ }
+ if (handler)
+ handler();
+}
+
} // namespace vsomeip
diff --git a/implementation/endpoints/src/local_server_endpoint_impl.cpp b/implementation/endpoints/src/local_server_endpoint_impl.cpp index 6ae9c6c..cfd5c0b 100644 --- a/implementation/endpoints/src/local_server_endpoint_impl.cpp +++ b/implementation/endpoints/src/local_server_endpoint_impl.cpp @@ -30,7 +30,7 @@ local_server_endpoint_impl::local_server_endpoint_impl( endpoint_type _local, boost::asio::io_service &_io, std::uint32_t _max_message_size, std::uint32_t _buffer_shrink_threshold, - configuration::endpoint_queue_limit_t _queue_limit) + configuration::endpoint_queue_limit_t _queue_limit, std::uint32_t _mode) : local_server_endpoint_base_impl(_host, _local, _io, _max_message_size, _queue_limit), acceptor_(_io), @@ -48,8 +48,11 @@ local_server_endpoint_impl::local_server_endpoint_impl( boost::asio::detail::throw_error(ec, "acceptor listen"); #ifndef _WIN32 + if (chmod(_local.path().c_str(), static_cast<mode_t>(_mode)) == -1) { + VSOMEIP_ERROR << __func__ << ": chmod: " << strerror(errno); + } if (_host->get_configuration()->is_security_enabled()) { - credentials::activate_credentials(acceptor_.native()); + credentials::activate_credentials(acceptor_.native_handle()); } #endif } @@ -60,7 +63,7 @@ local_server_endpoint_impl::local_server_endpoint_impl( std::uint32_t _max_message_size, int native_socket, std::uint32_t _buffer_shrink_threshold, - configuration::endpoint_queue_limit_t _queue_limit) + configuration::endpoint_queue_limit_t _queue_limit, std::uint32_t _mode) : local_server_endpoint_base_impl(_host, _local, _io, _max_message_size, _queue_limit), acceptor_(_io), @@ -72,8 +75,11 @@ local_server_endpoint_impl::local_server_endpoint_impl( boost::asio::detail::throw_error(ec, "acceptor assign native socket"); #ifndef _WIN32 + if (chmod(_local.path().c_str(), static_cast<mode_t>(_mode)) == -1) { + VSOMEIP_ERROR << __func__ << ": chmod: " << strerror(errno); + } if (_host->get_configuration()->is_security_enabled()) { - credentials::activate_credentials(acceptor_.native()); + credentials::activate_credentials(acceptor_.native_handle()); } #endif } @@ -127,6 +133,12 @@ void local_server_endpoint_impl::stop() { } connections_.clear(); } +#ifndef _WIN32 + { + std::lock_guard<std::mutex> its_lock(client_connections_mutex_); + client_connections_.clear(); + } +#endif } bool local_server_endpoint_impl::send_to( @@ -178,15 +190,31 @@ bool local_server_endpoint_impl::get_default_target( void local_server_endpoint_impl::remove_connection( local_server_endpoint_impl::connection *_connection) { - std::lock_guard<std::mutex> its_lock(connections_mutex_); - for (auto it = connections_.begin(); it != connections_.end();) { - if (it->second.get() == _connection) { - it = connections_.erase(it); - break; - } else { - ++it; + { + std::lock_guard<std::mutex> its_lock(connections_mutex_); + for (auto it = connections_.begin(); it != connections_.end();) { + if (it->second.get() == _connection) { + it = connections_.erase(it); + break; + } else { + ++it; + } } } + +#ifndef _WIN32 + { + std::lock_guard<std::mutex> its_lock(client_connections_mutex_); + for (auto it = client_connections_.begin(); it != client_connections_.end();) { + if (it->second.get() == _connection) { + it = client_connections_.erase(it); + break; + } else { + ++it; + } + } + } +#endif } void local_server_endpoint_impl::accept_cbk( @@ -196,23 +224,62 @@ void local_server_endpoint_impl::accept_cbk( && _error != boost::asio::error::operation_aborted && _error != boost::asio::error::no_descriptors) { start(); + } else if (_error == boost::asio::error::no_descriptors) { + VSOMEIP_ERROR << "local_server_endpoint_impl::accept_cbk: " + << _error.message() << " (" << std::dec << _error.value() + << ") Will try to accept again in 1000ms"; + std::shared_ptr<boost::asio::steady_timer> its_timer = + std::make_shared<boost::asio::steady_timer>(service_, + std::chrono::milliseconds(1000)); + auto its_ep = std::dynamic_pointer_cast<local_server_endpoint_impl>( + shared_from_this()); + its_timer->async_wait([its_timer, its_ep] + (const boost::system::error_code& _error) { + if (!_error) { + its_ep->start(); + } + }); } if (!_error) { #ifndef _WIN32 auto its_host = host_.lock(); + client_t client = 0; if (its_host) { if (its_host->get_configuration()->is_security_enabled()) { std::unique_lock<std::mutex> its_socket_lock(_connection->get_socket_lock()); socket_type &new_connection_socket = _connection->get_socket(); - uid_t uid(0); - gid_t gid(0); - client_t client = credentials::receive_credentials( + uid_t uid(0xffffffff); + gid_t gid(0xffffffff); + client = credentials::receive_credentials( new_connection_socket.native(), uid, gid); + + std::lock_guard<std::mutex> its_client_connection_lock(client_connections_mutex_); + auto found_client = client_connections_.find(client); + if (found_client != client_connections_.end()) { + VSOMEIP_WARNING << std::hex << "vSomeIP Security: Rejecting new connection with client ID 0x" << client + << " uid/gid= " << std::dec << uid << "/" << gid + << " because of already existing connection using same client ID"; + boost::system::error_code er; + new_connection_socket.shutdown(new_connection_socket.shutdown_both, er); + new_connection_socket.close(er); + return; + } + + if (!its_host->get_configuration()->check_routing_credentials(client, uid, gid)) { + VSOMEIP_WARNING << std::hex << "vSomeIP Security: Rejecting new connection with routing manager client ID 0x" << client + << " uid/gid= " << std::dec << uid << "/" << gid + << " because passed credentials do not match with routing manager credentials!"; + boost::system::error_code er; + new_connection_socket.shutdown(new_connection_socket.shutdown_both, er); + new_connection_socket.close(er); + return; + } + if (!its_host->check_credentials(client, uid, gid)) { - VSOMEIP_WARNING << std::hex << "Client 0x" << its_host->get_client() - << " received client credentials from client 0x" << client - << " which violates the security policy : uid/gid=" + VSOMEIP_WARNING << "vSomeIP Security: Client 0x" << std::hex + << its_host->get_client() << " received client credentials from client 0x" + << client << " which violates the security policy : uid/gid=" << std::dec << uid << "/" << gid; boost::system::error_code er; new_connection_socket.shutdown(new_connection_socket.shutdown_both, er); @@ -234,8 +301,16 @@ void local_server_endpoint_impl::accept_cbk( } if (!its_error) { { - std::lock_guard<std::mutex> its_lock(connections_mutex_); - connections_[remote] = _connection; + { + std::lock_guard<std::mutex> its_lock(connections_mutex_); + connections_[remote] = _connection; + } +#ifndef _WIN32 + { + std::lock_guard<std::mutex> its_lock(client_connections_mutex_); + client_connections_[client] = _connection; + } +#endif } _connection->start(); } @@ -271,7 +346,6 @@ local_server_endpoint_impl::connection::create( std::uint32_t _buffer_shrink_threshold, boost::asio::io_service &_io_service) { const std::uint32_t its_initial_buffer_size = VSOMEIP_COMMAND_HEADER_SIZE - + VSOMEIP_MAX_LOCAL_MESSAGE_SIZE + static_cast<std::uint32_t>(sizeof(instance_t) + sizeof(bool) + sizeof(bool)); return ptr(new connection(_server, _max_message_size, its_initial_buffer_size, @@ -334,6 +408,12 @@ void local_server_endpoint_impl::connection::start() { void local_server_endpoint_impl::connection::stop() { std::lock_guard<std::mutex> its_lock(socket_mutex_); if (socket_.is_open()) { +#ifndef _WIN32 + if (-1 == fcntl(socket_.native_handle(), F_GETFD)) { + VSOMEIP_ERROR << "lse: socket/handle closed already '" << std::string(std::strerror(errno)) + << "' (" << errno << ") " << get_path_local(); + } +#endif boost::system::error_code its_error; socket_.shutdown(socket_.shutdown_both, its_error); socket_.close(its_error); @@ -487,19 +567,33 @@ void local_server_endpoint_impl::connection::receive_cbk( // start tag (4 Byte) + command (1 Byte) + client id (2 Byte) // + command size (4 Byte) + data itself + stop tag (4 byte) // = 15 Bytes not covered in command size. - if (its_command_size && its_command_size + 15 > recv_buffer_size_) { + if (its_command_size + 15 > recv_buffer_size_) { missing_capacity_ = its_command_size + 15 - std::uint32_t(recv_buffer_size_); } else if (recv_buffer_size_ < 11) { // to little data to read out the command size // minimal amount of data needed to read out command size = 11 missing_capacity_ = 11 - static_cast<std::uint32_t>(recv_buffer_size_); } else { + std::stringstream local_msg; + for (std::size_t i = its_iteration_gap; + i < recv_buffer_size_ + its_iteration_gap && + i - its_iteration_gap < 32; i++) { + local_msg << std::setw(2) << std::setfill('0') + << std::hex << (int) recv_buffer_[i] << " "; + } VSOMEIP_ERROR << "lse::c<" << this << ">rcb: recv_buffer_size is: " << std::dec << recv_buffer_size_ << " but couldn't read " "out command size. recv_buffer_capacity: " - << recv_buffer_.capacity() - << " its_iteration_gap: " << its_iteration_gap; + << std::dec << recv_buffer_.capacity() + << " its_iteration_gap: " << std::dec + << its_iteration_gap << " bound client: 0x" + << std::hex << bound_client_ << " buffer: " + << local_msg.str(); + recv_buffer_size_ = 0; + missing_capacity_ = 0; + its_iteration_gap = 0; + message_is_empty = true; } } } @@ -525,7 +619,7 @@ void local_server_endpoint_impl::connection::receive_cbk( found_message = true; its_iteration_gap = its_end + 4; } else { - if (!message_is_empty && its_iteration_gap) { + if (its_iteration_gap) { // Message not complete and not in front of the buffer! // Copy last part to front for consume in future receive_cbk call! for (size_t i = 0; i < recv_buffer_size_; ++i) { @@ -545,6 +639,7 @@ void local_server_endpoint_impl::connection::receive_cbk( || _error == boost::asio::error::connection_reset) { stop(); its_server->remove_connection(this); + its_host->get_configuration()->remove_client_to_uid_gid_mapping(bound_client_); } else if (_error != boost::asio::error::bad_descriptor) { start(); } @@ -555,6 +650,11 @@ void local_server_endpoint_impl::connection::set_bound_client(client_t _client) bound_client_ = _client; } +client_t local_server_endpoint_impl::connection::get_bound_client() const { + return bound_client_; +} + + void local_server_endpoint_impl::connection::calculate_shrink_count() { if (buffer_shrink_threshold_) { if (recv_buffer_.capacity() != recv_buffer_size_initial_) { @@ -624,6 +724,12 @@ void local_server_endpoint_impl::connection::handle_recv_buffer_exception( VSOMEIP_ERROR << its_message.str(); recv_buffer_.clear(); if (socket_.is_open()) { +#ifndef _WIN32 + if (-1 == fcntl(socket_.native_handle(), F_GETFD)) { + VSOMEIP_ERROR << "lse: socket/handle closed already '" << std::string(std::strerror(errno)) + << "' (" << errno << ") " << get_path_local(); + } +#endif boost::system::error_code its_error; socket_.shutdown(socket_.shutdown_both, its_error); socket_.close(its_error); diff --git a/implementation/endpoints/src/netlink_connector.cpp b/implementation/endpoints/src/netlink_connector.cpp index 94b75cf..2f9e2e5 100644 --- a/implementation/endpoints/src/netlink_connector.cpp +++ b/implementation/endpoints/src/netlink_connector.cpp @@ -103,7 +103,7 @@ void netlink_connector::receive_cbk(boost::system::error_code const &_error, struct nlmsghdr *nlh = (struct nlmsghdr *)&recv_buffer_[0]; while ((NLMSG_OK(nlh, len)) && (nlh->nlmsg_type != NLMSG_DONE)) { - char ifname[1024]; + char ifname[IF_NAMESIZE]; switch (nlh->nlmsg_type) { case RTM_NEWADDR: { // New Address information @@ -356,7 +356,7 @@ bool netlink_connector::check_sd_multicast_route_match(struct rtmsg* _routemsg, struct rtattr *retrta; retrta = static_cast<struct rtattr *>(RTM_RTA(_routemsg)); int if_index(0); - char if_name[1024] = "n/a"; + char if_name[IF_NAMESIZE] = "n/a"; char address[INET6_ADDRSTRLEN] = "n/a"; char gateway[INET6_ADDRSTRLEN] = "n/a"; bool matches_sd_multicast(false); diff --git a/implementation/endpoints/src/server_endpoint_impl.cpp b/implementation/endpoints/src/server_endpoint_impl.cpp index 6da5826..915bfc9 100644 --- a/implementation/endpoints/src/server_endpoint_impl.cpp +++ b/implementation/endpoints/src/server_endpoint_impl.cpp @@ -19,6 +19,7 @@ #include "../../logging/include/logger.hpp"
#include "../../utility/include/byteorder.hpp"
#include "../../utility/include/utility.hpp"
+#include "../../service_discovery/include/defines.hpp"
namespace vsomeip {
@@ -54,12 +55,17 @@ void server_endpoint_impl<Protocol>::restart(bool _force) { }
template<typename Protocol>
-bool server_endpoint_impl<Protocol>::is_connected() const {
+bool server_endpoint_impl<Protocol>::is_established() const {
return true;
}
template<typename Protocol>
-void server_endpoint_impl<Protocol>::set_connected(bool _connected) {
(void) _connected;
}
+void server_endpoint_impl<Protocol>::set_established(bool _established) {
(void) _established;
}
+
+template<typename Protocol>
+void server_endpoint_impl<Protocol>::set_connected(bool _connected) {
+ (void) _connected;
+}
template<typename Protocol>
bool server_endpoint_impl<Protocol>::send(const uint8_t *_data,
uint32_t _size, bool _flush) {
#if 0
@@ -102,6 +108,16 @@ template<typename Protocol>
bool server_endpoint_impl<Protocol>::send(const uint VSOMEIP_WARNING << "server_endpoint::send: session_id 0x"
<< std::hex << its_session
<< " not found for client 0x" << its_client;
+ const method_t its_method =
+ VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_METHOD_POS_MIN],
+ _data[VSOMEIP_METHOD_POS_MAX]);
+ if (its_service == VSOMEIP_SD_SERVICE
+ && its_method == VSOMEIP_SD_METHOD) {
+ VSOMEIP_ERROR << "Clearing clients map as a request was "
+ "received on SD port";
+ clients_.clear();
+ is_valid_target = get_default_target(its_service, its_target);
+ }
}
} else {
is_valid_target = get_default_target(its_service, its_target);
diff --git a/implementation/endpoints/src/tcp_client_endpoint_impl.cpp b/implementation/endpoints/src/tcp_client_endpoint_impl.cpp index cb92619..41254f6 100644 --- a/implementation/endpoints/src/tcp_client_endpoint_impl.cpp +++ b/implementation/endpoints/src/tcp_client_endpoint_impl.cpp @@ -30,7 +30,9 @@ tcp_client_endpoint_impl::tcp_client_endpoint_impl( std::uint32_t _max_message_size,
std::uint32_t _buffer_shrink_threshold,
std::chrono::milliseconds _send_timeout,
- configuration::endpoint_queue_limit_t _queue_limit)
+ configuration::endpoint_queue_limit_t _queue_limit,
+ std::uint32_t _tcp_restart_aborts_max,
+ std::uint32_t _tcp_connect_time_max)
: tcp_client_endpoint_base_impl(_host, _local, _remote, _io,
_max_message_size, _queue_limit),
recv_buffer_size_initial_(VSOMEIP_SOMEIP_HEADER_SIZE),
@@ -41,7 +43,10 @@ tcp_client_endpoint_impl::tcp_client_endpoint_impl( remote_port_(_remote.port()),
last_cookie_sent_(std::chrono::steady_clock::now() - std::chrono::seconds(11)),
send_timeout_(_send_timeout),
- send_timeout_warning_(_send_timeout / 2) {
+ send_timeout_warning_(_send_timeout / 2),
+ tcp_restart_aborts_max_(_tcp_restart_aborts_max),
+ tcp_connect_time_max_(_tcp_connect_time_max),
+ aborted_restart_count_(0) {
is_supporting_magic_cookies_ = true;
}
@@ -62,7 +67,19 @@ void tcp_client_endpoint_impl::start() { void tcp_client_endpoint_impl::restart(bool _force) {
if (!_force && state_ == cei_state_e::CONNECTING) {
- return;
+ std::chrono::steady_clock::time_point its_current
+ = std::chrono::steady_clock::now();
+ long its_connect_duration = std::chrono::duration_cast<std::chrono::milliseconds>(
+ its_current - connect_timepoint_).count();
+ if (aborted_restart_count_ < tcp_restart_aborts_max_
+ && its_connect_duration < tcp_connect_time_max_) {
+ aborted_restart_count_++;
+ return;
+ } else {
+ VSOMEIP_WARNING << "tce::restart: maximum number of aborted restarts ["
+ << tcp_restart_aborts_max_ << "] reached! its_connect_duration: "
+ << its_connect_duration;
+ }
}
state_ = cei_state_e::CONNECTING;
std::string address_port_local;
@@ -72,6 +89,8 @@ void tcp_client_endpoint_impl::restart(bool _force) { shutdown_and_close_socket_unlocked(true);
recv_buffer_ = std::make_shared<message_buffer_t>(recv_buffer_size_initial_, 0);
}
+ was_not_connected_ = true;
+ reconnect_counter_ = 0;
{
std::lock_guard<std::mutex> its_lock(mutex_);
for (const auto&m : queue_) {
@@ -150,7 +169,7 @@ void tcp_client_endpoint_impl::connect() { << " remote:" << get_address_port_remote();
try {
// don't connect on bind error to avoid using a random port
- service_.post(std::bind(&client_endpoint_impl::connect_cbk,
+ strand_.post(std::bind(&client_endpoint_impl::connect_cbk, shared_from_this(), its_bind_error));
} catch (const std::exception &e) {
VSOMEIP_ERROR << "tcp_client_endpoint_impl::connect: "
@@ -160,17 +179,23 @@ void tcp_client_endpoint_impl::connect() { }
}
state_ = cei_state_e::CONNECTING;
+ connect_timepoint_ = std::chrono::steady_clock::now();
+ aborted_restart_count_ = 0;
socket_->async_connect(
remote_,
- std::bind(
- &tcp_client_endpoint_base_impl::connect_cbk,
- shared_from_this(),
- std::placeholders::_1
+ strand_.wrap( + std::bind( + &tcp_client_endpoint_base_impl::connect_cbk, + shared_from_this(), + std::placeholders::_1 + ) )
);
} else {
VSOMEIP_WARNING << "tcp_client_endpoint::connect: Error opening socket: "
<< its_error.message() << " remote:" << get_address_port_remote();
+ strand_.post(std::bind(&tcp_client_endpoint_base_impl::connect_cbk,
+ shared_from_this(), its_error));
}
}
@@ -200,6 +225,12 @@ void tcp_client_endpoint_impl::receive(message_buffer_ptr_t _recv_buffer, if (its_capacity < its_required_capacity) {
_recv_buffer->reserve(its_required_capacity);
_recv_buffer->resize(its_required_capacity, 0x0);
+ if (_recv_buffer->size() > 1048576) {
+ VSOMEIP_INFO << "tce: recv_buffer size is: " <<
+ _recv_buffer->size()
+ << " local: " << get_address_port_local()
+ << " remote: " << get_address_port_remote();
+ }
}
buffer_size = _missing_capacity;
} else if (buffer_shrink_threshold_
@@ -217,13 +248,15 @@ void tcp_client_endpoint_impl::receive(message_buffer_ptr_t _recv_buffer, }
socket_->async_receive(
boost::asio::buffer(&(*_recv_buffer)[_recv_buffer_size], buffer_size),
- std::bind(
- &tcp_client_endpoint_impl::receive_cbk,
- std::dynamic_pointer_cast< tcp_client_endpoint_impl >(shared_from_this()),
- std::placeholders::_1,
- std::placeholders::_2,
- _recv_buffer,
- _recv_buffer_size
+ strand_.wrap( + std::bind( + &tcp_client_endpoint_impl::receive_cbk, + std::dynamic_pointer_cast< tcp_client_endpoint_impl >(shared_from_this()), + std::placeholders::_1, + std::placeholders::_2, + _recv_buffer, + _recv_buffer_size + ) )
);
}
@@ -431,7 +464,7 @@ void tcp_client_endpoint_impl::receive_cbk( return;
}
uint32_t current_message_size = static_cast<uint32_t>(read_message_size);
- has_full_message = (current_message_size > VSOMEIP_SOMEIP_HEADER_SIZE
+ has_full_message = (current_message_size > VSOMEIP_RETURN_CODE_POS
&& current_message_size <= _recv_buffer_size);
if (has_full_message) {
bool needs_forwarding(true);
@@ -472,58 +505,125 @@ void tcp_client_endpoint_impl::receive_cbk( _recv_buffer_size -= current_message_size;
its_iteration_gap += current_message_size;
its_missing_capacity = 0;
- } else if (max_message_size_ != MESSAGE_SIZE_UNLIMITED &&
- current_message_size > max_message_size_) {
- _recv_buffer_size = 0;
- _recv_buffer->resize(recv_buffer_size_initial_, 0x0);
- _recv_buffer->shrink_to_fit();
- if (has_enabled_magic_cookies_) {
- VSOMEIP_ERROR << "Received a TCP message which exceeds "
- << "maximum message size ("
- << std::dec << current_message_size
- << "). Magic Cookies are enabled: "
- << "Resetting receiver. local: "
- << get_address_port_local() << " remote: "
- << get_address_port_remote();
- } else {
- VSOMEIP_ERROR << "Received a TCP message which exceeds "
- << "maximum message size ("
- << std::dec << current_message_size
- << ") Magic cookies are disabled: "
- << "Client will be disabled! local: "
- << get_address_port_local() << " remote: "
- << get_address_port_remote();
- return;
- }
- } else if (current_message_size > _recv_buffer_size) {
- its_missing_capacity = current_message_size
- - static_cast<std::uint32_t>(_recv_buffer_size);
- } else if (VSOMEIP_SOMEIP_HEADER_SIZE > _recv_buffer_size) {
- its_missing_capacity = VSOMEIP_SOMEIP_HEADER_SIZE
- - static_cast<std::uint32_t>(_recv_buffer_size);
} else if (has_enabled_magic_cookies_ && _recv_buffer_size > 0) {
- uint32_t its_offset = find_magic_cookie(&(*_recv_buffer)[its_iteration_gap], _recv_buffer_size);
+ const uint32_t its_offset = find_magic_cookie(
+ &(*_recv_buffer)[its_iteration_gap], _recv_buffer_size);
if (its_offset < _recv_buffer_size) {
_recv_buffer_size -= its_offset;
its_iteration_gap += its_offset;
has_full_message = true; // trigger next loop
- } else {
+ VSOMEIP_ERROR << "Detected Magic Cookie within message data."
+ << " Resyncing. local: " << get_address_port_local()
+ << " remote: " << get_address_port_remote();
+ }
+ }
+
+ if (!has_full_message) {
+ if (_recv_buffer_size > VSOMEIP_RETURN_CODE_POS &&
+ ((*recv_buffer_)[its_iteration_gap + VSOMEIP_PROTOCOL_VERSION_POS] != VSOMEIP_PROTOCOL_VERSION ||
+ !utility::is_valid_message_type(static_cast<message_type_e>((*recv_buffer_)[its_iteration_gap + VSOMEIP_MESSAGE_TYPE_POS])) ||
+ !utility::is_valid_return_code(static_cast<return_code_e>((*recv_buffer_)[its_iteration_gap + VSOMEIP_RETURN_CODE_POS]))
+ )) {
+ if ((*recv_buffer_)[its_iteration_gap + VSOMEIP_PROTOCOL_VERSION_POS] != VSOMEIP_PROTOCOL_VERSION) {
+ VSOMEIP_ERROR << "tce: Wrong protocol version: 0x"
+ << std::hex << std::setw(2) << std::setfill('0')
+ << std::uint32_t((*recv_buffer_)[its_iteration_gap + VSOMEIP_PROTOCOL_VERSION_POS])
+ << " local: " << get_address_port_local()
+ << " remote: " << get_address_port_remote();
+ // ensure to send back a message w/ wrong protocol version
+ its_lock.unlock();
+ its_host->on_message(&(*_recv_buffer)[its_iteration_gap],
+ VSOMEIP_SOMEIP_HEADER_SIZE + 8, this,
+ boost::asio::ip::address(),
+ VSOMEIP_ROUTING_CLIENT,
+ remote_address_,
+ remote_port_);
+ its_lock.lock();
+ } else if (!utility::is_valid_message_type(static_cast<message_type_e>(
+ (*recv_buffer_)[its_iteration_gap + VSOMEIP_MESSAGE_TYPE_POS]))) {
+ VSOMEIP_ERROR << "tce: Invalid message type: 0x"
+ << std::hex << std::setw(2) << std::setfill('0')
+ << std::uint32_t((*recv_buffer_)[its_iteration_gap + VSOMEIP_MESSAGE_TYPE_POS])
+ << " local: " << get_address_port_local()
+ << " remote: " << get_address_port_remote();
+ } else if (!utility::is_valid_return_code(static_cast<return_code_e>(
+ (*recv_buffer_)[its_iteration_gap + VSOMEIP_RETURN_CODE_POS]))) {
+ VSOMEIP_ERROR << "tce: Invalid return code: 0x"
+ << std::hex << std::setw(2) << std::setfill('0')
+ << std::uint32_t((*recv_buffer_)[its_iteration_gap + VSOMEIP_RETURN_CODE_POS])
+ << " local: " << get_address_port_local()
+ << " remote: " << get_address_port_remote();
+ }
+ state_ = cei_state_e::CONNECTING;
+ shutdown_and_close_socket_unlocked(false);
+ its_lock.unlock();
+ its_host->on_disconnect(shared_from_this());
+ restart(true);
+ return;
+ } else if (max_message_size_ != MESSAGE_SIZE_UNLIMITED &&
+ current_message_size > max_message_size_) {
+ _recv_buffer_size = 0;
+ _recv_buffer->resize(recv_buffer_size_initial_, 0x0);
+ _recv_buffer->shrink_to_fit();
+ if (has_enabled_magic_cookies_) {
+ VSOMEIP_ERROR << "Received a TCP message which exceeds "
+ << "maximum message size ("
+ << std::dec << current_message_size
+ << "). Magic Cookies are enabled: "
+ << "Resetting receiver. local: "
+ << get_address_port_local() << " remote: "
+ << get_address_port_remote();
+ } else {
+ VSOMEIP_ERROR << "Received a TCP message which exceeds "
+ << "maximum message size ("
+ << std::dec << current_message_size
+ << ") Magic cookies are disabled, "
+ << "Restarting connection. "
+ << "local: " << get_address_port_local()
+ << " remote: " << get_address_port_remote();
+ state_ = cei_state_e::CONNECTING;
+ shutdown_and_close_socket_unlocked(false);
+ its_lock.unlock();
+ its_host->on_disconnect(shared_from_this());
+ restart(true);
+ return;
+ }
+ } else if (current_message_size > _recv_buffer_size) {
+ its_missing_capacity = current_message_size
+ - static_cast<std::uint32_t>(_recv_buffer_size);
+ } else if (VSOMEIP_SOMEIP_HEADER_SIZE > _recv_buffer_size) {
+ its_missing_capacity = VSOMEIP_SOMEIP_HEADER_SIZE
+ - static_cast<std::uint32_t>(_recv_buffer_size);
+ } else if (has_enabled_magic_cookies_ && _recv_buffer_size > 0) {
+ // no need to check for magic cookie here again: has_full_message
+ // would have been set to true if there was one present in the data
_recv_buffer_size = 0;
+ _recv_buffer->resize(recv_buffer_size_initial_, 0x0);
+ _recv_buffer->shrink_to_fit();
its_missing_capacity = 0;
+ VSOMEIP_ERROR << "tce::c<" << this
+ << ">rcb: recv_buffer_capacity: "
+ << _recv_buffer->capacity()
+ << " local: " << get_address_port_local()
+ << " remote: " << get_address_port_remote()
+ << ". Didn't find magic cookie in broken data, trying to resync.";
+ } else {
+ VSOMEIP_ERROR << "tce::c<" << this
+ << ">rcb: recv_buffer_size is: " << std::dec
+ << _recv_buffer_size << " but couldn't read "
+ "out message_size. recv_buffer_capacity: "
+ << _recv_buffer->capacity()
+ << " its_iteration_gap: " << its_iteration_gap
+ << " local: " << get_address_port_local()
+ << " remote: " << get_address_port_remote()
+ << ". Restarting connection due to missing/broken data TCP stream.";
+ state_ = cei_state_e::CONNECTING;
+ shutdown_and_close_socket_unlocked(false);
+ its_lock.unlock();
+ its_host->on_disconnect(shared_from_this());
+ restart(true);
+ return;
}
- } else {
- VSOMEIP_ERROR << "tce::c<" << this
- << ">rcb: recv_buffer_size is: " << std::dec
- << _recv_buffer_size << " but couldn't read "
- "out message_size. recv_buffer_capacity: "
- << _recv_buffer->capacity()
- << " its_iteration_gap: " << its_iteration_gap
- << " local: " << get_address_port_local()
- << " remote: " << get_address_port_remote()
- << ". Restarting connection due to missing/broken data TCP stream.";
- its_lock.unlock();
- restart(true);
- return;
}
} while (has_full_message && _recv_buffer_size);
if (its_iteration_gap) {
@@ -555,7 +655,6 @@ void tcp_client_endpoint_impl::receive_cbk( VSOMEIP_WARNING << "tcp_client_endpoint receive_cbk restarting."; state_ = cei_state_e::CONNECTING;
shutdown_and_close_socket_unlocked(false);
- was_not_connected_ = true;
its_lock.unlock();
its_host->on_disconnect(shared_from_this());
restart(true);
@@ -705,7 +804,6 @@ void tcp_client_endpoint_impl::send_cbk(boost::system::error_code const &_error, } else {
state_ = cei_state_e::CONNECTING;
shutdown_and_close_socket(false);
- was_not_connected_ = true;
std::shared_ptr<endpoint_host> its_host = host_.lock();
if (its_host) {
its_host->on_disconnect(shared_from_this());
@@ -742,4 +840,12 @@ void tcp_client_endpoint_impl::send_cbk(boost::system::error_code const &_error, }
}
+std::uint32_t tcp_client_endpoint_impl::get_max_allowed_reconnects() const {
+ return MAX_RECONNECTS_UNLIMITED;
+}
+
+void tcp_client_endpoint_impl::max_allowed_reconnects_reached() {
+ return;
+}
+
} // namespace vsomeip
diff --git a/implementation/endpoints/src/tcp_server_endpoint_impl.cpp b/implementation/endpoints/src/tcp_server_endpoint_impl.cpp index 9c2998f..30402c5 100644 --- a/implementation/endpoints/src/tcp_server_endpoint_impl.cpp +++ b/implementation/endpoints/src/tcp_server_endpoint_impl.cpp @@ -188,6 +188,21 @@ void tcp_server_endpoint_impl::accept_cbk(connection::ptr _connection, && _error != boost::asio::error::operation_aborted && _error != boost::asio::error::no_descriptors) { start(); + } else if (_error == boost::asio::error::no_descriptors) { + VSOMEIP_ERROR<< "tcp_server_endpoint_impl::accept_cbk: " + << _error.message() << " (" << std::dec << _error.value() + << ") Will try to accept again in 1000ms"; + std::shared_ptr<boost::asio::steady_timer> its_timer = + std::make_shared<boost::asio::steady_timer>(service_, + std::chrono::milliseconds(1000)); + auto its_ep = std::dynamic_pointer_cast<tcp_server_endpoint_impl>( + shared_from_this()); + its_timer->async_wait([its_timer, its_ep] + (const boost::system::error_code& _error) { + if (!_error) { + its_ep->start(); + } + }); } } @@ -205,7 +220,7 @@ bool tcp_server_endpoint_impl::is_reliable() const { tcp_server_endpoint_impl::connection::connection( std::weak_ptr<tcp_server_endpoint_impl> _server, std::uint32_t _max_message_size, - std::uint32_t _initial_recv_buffer_size, + std::uint32_t _recv_buffer_size_initial, std::uint32_t _buffer_shrink_threshold, bool _magic_cookies_enabled, boost::asio::io_service &_io_service, @@ -213,8 +228,8 @@ tcp_server_endpoint_impl::connection::connection( socket_(_io_service), server_(_server), max_message_size_(_max_message_size), - recv_buffer_size_initial_(_initial_recv_buffer_size), - recv_buffer_(_initial_recv_buffer_size, 0), + recv_buffer_size_initial_(_recv_buffer_size_initial), + recv_buffer_(_recv_buffer_size_initial, 0), recv_buffer_size_(0), missing_capacity_(0), shrink_count_(0), @@ -235,8 +250,7 @@ tcp_server_endpoint_impl::connection::create( boost::asio::io_service & _io_service, std::chrono::milliseconds _send_timeout) { const std::uint32_t its_initial_receveive_buffer_size = - VSOMEIP_SOMEIP_HEADER_SIZE + 8 + MAGIC_COOKIE_SIZE + 8 - + VSOMEIP_MAX_TCP_MESSAGE_SIZE; + VSOMEIP_SOMEIP_HEADER_SIZE + 8 + MAGIC_COOKIE_SIZE + 8; return ptr(new connection(_server, _max_message_size, its_initial_receveive_buffer_size, _buffer_shrink_threshold, _magic_cookies_enabled, @@ -272,6 +286,12 @@ void tcp_server_endpoint_impl::connection::receive() { if (its_capacity < its_required_capacity) { recv_buffer_.reserve(its_required_capacity); recv_buffer_.resize(its_required_capacity, 0x0); + if (recv_buffer_.size() > 1048576) { + VSOMEIP_INFO << "tse: recv_buffer size is: " << + recv_buffer_.size() + << " local: " << get_address_port_local() + << " remote: " << get_address_port_remote(); + } } buffer_size = missing_capacity_; missing_capacity_ = 0; @@ -412,7 +432,7 @@ void tcp_server_endpoint_impl::connection::receive_cbk( return; } uint32_t current_message_size = static_cast<uint32_t>(read_message_size); - has_full_message = (current_message_size > VSOMEIP_SOMEIP_HEADER_SIZE + has_full_message = (current_message_size > VSOMEIP_RETURN_CODE_POS && current_message_size <= recv_buffer_size_); if (has_full_message) { bool needs_forwarding(true); @@ -424,7 +444,12 @@ void tcp_server_endpoint_impl::connection::receive_cbk( = its_server->find_magic_cookie(&recv_buffer_[its_iteration_gap], recv_buffer_size_); if (its_offset < current_message_size) { - VSOMEIP_ERROR << "Detected Magic Cookie within message data. Resyncing."; + { + std::lock_guard<std::mutex> its_lock(socket_mutex_); + VSOMEIP_ERROR << "Detected Magic Cookie within message data. Resyncing." + << " local: " << get_address_port_local() + << " remote: " << get_address_port_remote(); + } if (!is_magic_cookie(its_iteration_gap)) { its_host->on_error(&recv_buffer_[its_iteration_gap], static_cast<length_t>(recv_buffer_size_),its_server.get(), @@ -443,14 +468,16 @@ void tcp_server_endpoint_impl::connection::receive_cbk( std::memcpy(&its_client, &recv_buffer_[its_iteration_gap + VSOMEIP_CLIENT_POS_MIN], sizeof(client_t)); - session_t its_session; - std::memcpy(&its_session, - &recv_buffer_[its_iteration_gap + VSOMEIP_SESSION_POS_MIN], - sizeof(session_t)); - its_server->clients_mutex_.lock(); - its_server->clients_[its_client][its_session] = remote_; - its_server->endpoint_to_client_[remote_] = its_client; - its_server->clients_mutex_.unlock(); + if (its_client != MAGIC_COOKIE_NETWORK_BYTE_ORDER) { + session_t its_session; + std::memcpy(&its_session, + &recv_buffer_[its_iteration_gap + VSOMEIP_SESSION_POS_MIN], + sizeof(session_t)); + its_server->clients_mutex_.lock(); + its_server->clients_[its_client][its_session] = remote_; + its_server->endpoint_to_client_[remote_] = its_client; + its_server->clients_mutex_.unlock(); + } } if (!magic_cookies_enabled_) { its_host->on_message(&recv_buffer_[its_iteration_gap], @@ -478,7 +505,12 @@ void tcp_server_endpoint_impl::connection::receive_cbk( its_server->find_magic_cookie(&recv_buffer_[its_iteration_gap], recv_buffer_size_); if (its_offset < recv_buffer_size_) { - VSOMEIP_ERROR << "Detected Magic Cookie within message data. Resyncing."; + { + std::lock_guard<std::mutex> its_lock(socket_mutex_); + VSOMEIP_ERROR << "Detected Magic Cookie within message data. Resyncing." + << " local: " << get_address_port_local() + << " remote: " << get_address_port_remote(); + } if (!is_magic_cookie(its_iteration_gap)) { its_host->on_error(&recv_buffer_[its_iteration_gap], static_cast<length_t>(recv_buffer_size_), its_server.get(), @@ -496,7 +528,53 @@ void tcp_server_endpoint_impl::connection::receive_cbk( } if (!has_full_message) { - if (max_message_size_ != MESSAGE_SIZE_UNLIMITED + if (recv_buffer_size_ > VSOMEIP_RETURN_CODE_POS && + (recv_buffer_[its_iteration_gap + VSOMEIP_PROTOCOL_VERSION_POS] != VSOMEIP_PROTOCOL_VERSION || + !utility::is_valid_message_type(static_cast<message_type_e>(recv_buffer_[its_iteration_gap + VSOMEIP_MESSAGE_TYPE_POS])) || + !utility::is_valid_return_code(static_cast<return_code_e>(recv_buffer_[its_iteration_gap + VSOMEIP_RETURN_CODE_POS])) + )) { + if (recv_buffer_[its_iteration_gap + VSOMEIP_PROTOCOL_VERSION_POS] != VSOMEIP_PROTOCOL_VERSION) { + { + std::lock_guard<std::mutex> its_lock(socket_mutex_); + VSOMEIP_ERROR << "tse: Wrong protocol version: 0x" + << std::hex << std::setw(2) << std::setfill('0') + << std::uint32_t(recv_buffer_[its_iteration_gap + VSOMEIP_PROTOCOL_VERSION_POS]) + << " local: " << get_address_port_local() + << " remote: " << get_address_port_remote() + << ". Closing connection due to missing/broken data TCP stream."; + } + // ensure to send back a error message w/ wrong protocol version + its_host->on_message(&recv_buffer_[its_iteration_gap], + VSOMEIP_SOMEIP_HEADER_SIZE + 8, its_server.get(), + boost::asio::ip::address(), + VSOMEIP_ROUTING_CLIENT, + remote_address_, remote_port_); + } else if (!utility::is_valid_message_type(static_cast<message_type_e>( + recv_buffer_[its_iteration_gap + VSOMEIP_MESSAGE_TYPE_POS]))) { + std::lock_guard<std::mutex> its_lock(socket_mutex_); + VSOMEIP_ERROR << "tse: Invalid message type: 0x" + << std::hex << std::setw(2) << std::setfill('0') + << std::uint32_t(recv_buffer_[its_iteration_gap + VSOMEIP_MESSAGE_TYPE_POS]) + << " local: " << get_address_port_local() + << " remote: " << get_address_port_remote() + << ". Closing connection due to missing/broken data TCP stream."; + } else if (!utility::is_valid_return_code(static_cast<return_code_e>( + recv_buffer_[its_iteration_gap + VSOMEIP_RETURN_CODE_POS]))) { + std::lock_guard<std::mutex> its_lock(socket_mutex_); + VSOMEIP_ERROR << "tse: Invalid return code: 0x" + << std::hex << std::setw(2) << std::setfill('0') + << std::uint32_t(recv_buffer_[its_iteration_gap + VSOMEIP_RETURN_CODE_POS]) + << " local: " << get_address_port_local() + << " remote: " << get_address_port_remote() + << ". Closing connection due to missing/broken data TCP stream."; + } + { + std::lock_guard<std::mutex> its_lock(its_server->connections_mutex_); + stop(); + } + its_server->remove_connection(this); + return; + } else if (max_message_size_ != MESSAGE_SIZE_UNLIMITED && current_message_size > max_message_size_) { std::lock_guard<std::mutex> its_lock(socket_mutex_); recv_buffer_size_ = 0; @@ -517,9 +595,14 @@ void tcp_server_endpoint_impl::connection::receive_cbk( << std::dec << current_message_size << " > " << std::dec << max_message_size_ << ") Magic cookies are disabled: " - << "Connection will be disabled! local: " + << "Connection will be closed! local: " << get_address_port_local() << " remote: " << get_address_port_remote(); + { + std::lock_guard<std::mutex> its_lock(its_server->connections_mutex_); + stop(); + } + its_server->remove_connection(this); return; } } else if (current_message_size > recv_buffer_size_) { @@ -528,6 +611,18 @@ void tcp_server_endpoint_impl::connection::receive_cbk( } else if (VSOMEIP_SOMEIP_HEADER_SIZE > recv_buffer_size_) { missing_capacity_ = VSOMEIP_SOMEIP_HEADER_SIZE - static_cast<std::uint32_t>(recv_buffer_size_); + } else if (magic_cookies_enabled_ && recv_buffer_size_ > 0) { + // no need to check for magic cookie here again: has_full_message + // would have been set to true if there was one present in the data + recv_buffer_size_ = 0; + recv_buffer_.resize(recv_buffer_size_initial_, 0x0); + recv_buffer_.shrink_to_fit(); + missing_capacity_ = 0; + std::lock_guard<std::mutex> its_lock(socket_mutex_); + VSOMEIP_ERROR << "Didn't find magic cookie in broken" + << " data, trying to resync." + << " local: " << get_address_port_local() + << " remote: " << get_address_port_remote(); } else { { std::lock_guard<std::mutex> its_lock(socket_mutex_); diff --git a/implementation/endpoints/src/udp_client_endpoint_impl.cpp b/implementation/endpoints/src/udp_client_endpoint_impl.cpp index 7be55f4..67c9703 100644 --- a/implementation/endpoints/src/udp_client_endpoint_impl.cpp +++ b/implementation/endpoints/src/udp_client_endpoint_impl.cpp @@ -20,12 +20,13 @@ udp_client_endpoint_impl::udp_client_endpoint_impl( endpoint_type _local,
endpoint_type _remote,
boost::asio::io_service &_io,
- configuration::endpoint_queue_limit_t _queue_limit)
+ configuration::endpoint_queue_limit_t _queue_limit,
+ std::uint32_t _udp_receive_buffer_size)
: udp_client_endpoint_base_impl(_host, _local, _remote, _io,
VSOMEIP_MAX_UDP_MESSAGE_SIZE, _queue_limit),
- recv_buffer_(VSOMEIP_MAX_UDP_MESSAGE_SIZE, 0),
remote_address_(_remote.address()),
- remote_port_(_remote.port()) {
+ remote_port_(_remote.port()),
+ udp_receive_buffer_size_(_udp_receive_buffer_size) {
}
udp_client_endpoint_impl::~udp_client_endpoint_impl() {
@@ -53,6 +54,26 @@ void udp_client_endpoint_impl::connect() { << "SO_REUSEADDR: " << its_error.message() << " remote:"
<< get_address_port_remote();
}
+ socket_->set_option(boost::asio::socket_base::receive_buffer_size(
+ udp_receive_buffer_size_), its_error);
+ if (its_error) {
+ VSOMEIP_WARNING << "udp_client_endpoint_impl::connect: couldn't set "
+ << "SO_RCVBUF: " << its_error.message() << " to: "
+ << std::dec << udp_receive_buffer_size_ << " remote:"
+ << get_address_port_remote();
+ } else {
+ boost::asio::socket_base::receive_buffer_size its_option;
+ socket_->get_option(its_option, its_error);
+ if (its_error) {
+ VSOMEIP_WARNING << "udp_client_endpoint_impl::connect: couldn't get "
+ << "SO_RCVBUF: " << its_error.message() << " remote:"
+ << get_address_port_remote();
+ } else {
+ VSOMEIP_INFO << "udp_client_endpoint_impl::connect: SO_RCVBUF is: "
+ << std::dec << its_option.value();
+ }
+ }
+
// In case a client endpoint port was configured,
// bind to it before connecting
if (local_.port() != ILLEGAL_PORT) {
@@ -64,7 +85,7 @@ void udp_client_endpoint_impl::connect() { << " remote:" << get_address_port_remote();
try {
// don't connect on bind error to avoid using a random port
- service_.post(std::bind(&client_endpoint_impl::connect_cbk,
+ strand_.post(std::bind(&client_endpoint_impl::connect_cbk, shared_from_this(), its_bind_error));
} catch (const std::exception &e) {
VSOMEIP_ERROR << "udp_client_endpoint_impl::connect: "
@@ -76,15 +97,19 @@ void udp_client_endpoint_impl::connect() { state_ = cei_state_e::CONNECTING;
socket_->async_connect(
remote_,
- std::bind(
- &udp_client_endpoint_base_impl::connect_cbk,
- shared_from_this(),
- std::placeholders::_1
+ strand_.wrap( + std::bind( + &udp_client_endpoint_base_impl::connect_cbk, + shared_from_this(), + std::placeholders::_1 + ) )
);
} else {
VSOMEIP_WARNING << "udp_client_endpoint::connect: Error opening socket: "
<< its_error.message() << " remote:" << get_address_port_remote();
+ strand_.post(std::bind(&udp_client_endpoint_base_impl::connect_cbk,
+ shared_from_this(), its_error));
}
}
@@ -107,6 +132,8 @@ void udp_client_endpoint_impl::restart(bool _force) { local = get_address_port_local();
}
shutdown_and_close_socket(false);
+ was_not_connected_ = true;
+ reconnect_counter_ = 0;
VSOMEIP_WARNING << "uce::restart: local: " << local
<< " remote: " << get_address_port_remote();
start_connect_timer();
@@ -148,16 +175,20 @@ void udp_client_endpoint_impl::receive() { if (!socket_->is_open()) {
return;
}
+ message_buffer_ptr_t its_buffer = std::make_shared<message_buffer_t>(VSOMEIP_MAX_UDP_MESSAGE_SIZE);
socket_->async_receive_from(
- boost::asio::buffer(&recv_buffer_[0], max_message_size_),
+ boost::asio::buffer(*its_buffer),
remote_,
- std::bind(
- &udp_client_endpoint_impl::receive_cbk,
- std::dynamic_pointer_cast<
- udp_client_endpoint_impl
- >(shared_from_this()),
- std::placeholders::_1,
- std::placeholders::_2
+ strand_.wrap(
+ std::bind(
+ &udp_client_endpoint_impl::receive_cbk,
+ std::dynamic_pointer_cast<
+ udp_client_endpoint_impl
+ >(shared_from_this()),
+ std::placeholders::_1,
+ std::placeholders::_2,
+ its_buffer
+ )
)
);
}
@@ -190,7 +221,8 @@ std::uint16_t udp_client_endpoint_impl::get_remote_port() const { }
void udp_client_endpoint_impl::receive_cbk(
- boost::system::error_code const &_error, std::size_t _bytes) {
+ boost::system::error_code const &_error, std::size_t _bytes,
+ message_buffer_ptr_t _recv_buffer) {
if (_error == boost::asio::error::operation_aborted) {
// endpoint was stopped
return;
@@ -202,7 +234,7 @@ void udp_client_endpoint_impl::receive_cbk( msg << "ucei::rcb(" << _error.message() << "): ";
for (std::size_t i = 0; i < _bytes + recv_buffer_size_; ++i)
msg << std::hex << std::setw(2) << std::setfill('0')
- << (int) recv_buffer_[i] << " ";
+ << (int) (*_recv_buffer)[i] << " ";
VSOMEIP_INFO << msg.str();
#endif
std::size_t remaining_bytes = _bytes;
@@ -210,7 +242,7 @@ void udp_client_endpoint_impl::receive_cbk( do {
uint64_t read_message_size
- = utility::get_message_size(&this->recv_buffer_[i],
+ = utility::get_message_size(&(*_recv_buffer)[i],
remaining_bytes);
if (read_message_size > MESSAGE_SIZE_UNLIMITED) {
VSOMEIP_ERROR << "Message size exceeds allowed maximum!";
@@ -222,10 +254,45 @@ void udp_client_endpoint_impl::receive_cbk( if (remaining_bytes - current_message_size > remaining_bytes) {
VSOMEIP_ERROR << "buffer underflow in udp client endpoint ~> abort!";
return;
+ } else if (current_message_size > VSOMEIP_RETURN_CODE_POS &&
+ ((*_recv_buffer)[i + VSOMEIP_PROTOCOL_VERSION_POS] != VSOMEIP_PROTOCOL_VERSION ||
+ !utility::is_valid_message_type(static_cast<message_type_e>((*_recv_buffer)[i + VSOMEIP_MESSAGE_TYPE_POS])) ||
+ !utility::is_valid_return_code(static_cast<return_code_e>((*_recv_buffer)[i + VSOMEIP_RETURN_CODE_POS]))
+ )) {
+ if ((*_recv_buffer)[i + VSOMEIP_PROTOCOL_VERSION_POS] != VSOMEIP_PROTOCOL_VERSION) {
+ VSOMEIP_ERROR << "uce: Wrong protocol version: 0x"
+ << std::hex << std::setw(2) << std::setfill('0')
+ << std::uint32_t((*_recv_buffer)[i + VSOMEIP_PROTOCOL_VERSION_POS])
+ << " local: " << get_address_port_local()
+ << " remote: " << get_address_port_remote();
+ // ensure to send back a message w/ wrong protocol version
+ its_host->on_message(&(*_recv_buffer)[i],
+ VSOMEIP_SOMEIP_HEADER_SIZE + 8, this,
+ boost::asio::ip::address(),
+ VSOMEIP_ROUTING_CLIENT,
+ remote_address_,
+ remote_port_);
+ } else if (!utility::is_valid_message_type(static_cast<message_type_e>(
+ (*_recv_buffer)[i + VSOMEIP_MESSAGE_TYPE_POS]))) {
+ VSOMEIP_ERROR << "uce: Invalid message type: 0x"
+ << std::hex << std::setw(2) << std::setfill('0')
+ << std::uint32_t((*_recv_buffer)[i + VSOMEIP_MESSAGE_TYPE_POS])
+ << " local: " << get_address_port_local()
+ << " remote: " << get_address_port_remote();
+ } else if (!utility::is_valid_return_code(static_cast<return_code_e>(
+ (*_recv_buffer)[i + VSOMEIP_RETURN_CODE_POS]))) {
+ VSOMEIP_ERROR << "uce: Invalid return code: 0x"
+ << std::hex << std::setw(2) << std::setfill('0')
+ << std::uint32_t((*_recv_buffer)[i + VSOMEIP_RETURN_CODE_POS])
+ << " local: " << get_address_port_local()
+ << " remote: " << get_address_port_remote();
+ }
+ receive();
+ return;
}
remaining_bytes -= current_message_size;
- its_host->on_message(&recv_buffer_[i], current_message_size,
+ its_host->on_message(&(*_recv_buffer)[i], current_message_size,
this, boost::asio::ip::address(),
VSOMEIP_ROUTING_CLIENT, remote_address_,
remote_port_);
@@ -304,4 +371,12 @@ std::string udp_client_endpoint_impl::get_remote_information() const { + std::to_string(remote_.port());
}
+std::uint32_t udp_client_endpoint_impl::get_max_allowed_reconnects() const {
+ return MAX_RECONNECTS_UNLIMITED;
+}
+
+void udp_client_endpoint_impl::max_allowed_reconnects_reached() {
+ return;
+}
+
} // namespace vsomeip
diff --git a/implementation/endpoints/src/udp_server_endpoint_impl.cpp b/implementation/endpoints/src/udp_server_endpoint_impl.cpp index 6da5b32..6ad7ce8 100644 --- a/implementation/endpoints/src/udp_server_endpoint_impl.cpp +++ b/implementation/endpoints/src/udp_server_endpoint_impl.cpp @@ -8,6 +8,8 @@ #include <boost/asio/ip/multicast.hpp> +#include <vsomeip/constants.hpp> + #include "../include/endpoint_definition.hpp" #include "../include/endpoint_host.hpp" #include "../include/udp_server_endpoint_impl.hpp" @@ -25,7 +27,8 @@ udp_server_endpoint_impl::udp_server_endpoint_impl( std::shared_ptr< endpoint_host > _host, endpoint_type _local, boost::asio::io_service &_io, - configuration::endpoint_queue_limit_t _queue_limit) + configuration::endpoint_queue_limit_t _queue_limit, + std::uint32_t _udp_receive_buffer_size) : server_endpoint_impl<ip::udp_ext>( _host, _local, _io, VSOMEIP_MAX_UDP_MESSAGE_SIZE, _queue_limit), socket_(_io, _local.protocol()), @@ -60,6 +63,27 @@ udp_server_endpoint_impl::udp_server_endpoint_impl( socket_.set_option(option, ec); boost::asio::detail::throw_error(ec, "broadcast option"); + socket_.set_option(boost::asio::socket_base::receive_buffer_size( + _udp_receive_buffer_size), ec); + if (ec) { + VSOMEIP_WARNING << "udp_server_endpoint_impl:: couldn't set " + << "SO_RCVBUF: " << ec.message() << " to: " << std::dec + << _udp_receive_buffer_size << " local port: " << std::dec + << local_port_; + } else { + boost::asio::socket_base::receive_buffer_size its_option; + socket_.get_option(its_option, ec); + if (ec) { + VSOMEIP_WARNING << "udp_server_endpoint_impl: couldn't get " + << "SO_RCVBUF: " << ec.message() << " local port:" + << std::dec << local_port_; + } else { + VSOMEIP_INFO << "udp_server_endpoint_impl: SO_RCVBUF is: " + << std::dec << its_option.value(); + } + } + + #ifdef _WIN32 const char* optval("0001"); ::setsockopt(socket_.native(), IPPROTO_IP, IP_PKTINFO, @@ -167,8 +191,7 @@ bool udp_server_endpoint_impl::is_joined( void udp_server_endpoint_impl::join(const std::string &_address) { bool has_received(false); - std::function<void(const std::string &)> join_func = - [this](const std::string &_address) { + auto join_func = [this](const std::string &_address) { try { bool is_v4(false); bool is_v6(false); @@ -318,6 +341,41 @@ void udp_server_endpoint_impl::receive_cbk( if (remaining_bytes - current_message_size > remaining_bytes) { VSOMEIP_ERROR << "buffer underflow in udp client endpoint ~> abort!"; return; + } else if (current_message_size > VSOMEIP_RETURN_CODE_POS && + (recv_buffer_[i + VSOMEIP_PROTOCOL_VERSION_POS] != VSOMEIP_PROTOCOL_VERSION || + !utility::is_valid_message_type(static_cast<message_type_e>(recv_buffer_[i + VSOMEIP_MESSAGE_TYPE_POS])) || + !utility::is_valid_return_code(static_cast<return_code_e>(recv_buffer_[i + VSOMEIP_RETURN_CODE_POS])) + )) { + if (recv_buffer_[i + VSOMEIP_PROTOCOL_VERSION_POS] != VSOMEIP_PROTOCOL_VERSION) { + VSOMEIP_ERROR << "use: Wrong protocol version: 0x" + << std::hex << std::setw(2) << std::setfill('0') + << std::uint32_t(recv_buffer_[i + VSOMEIP_PROTOCOL_VERSION_POS]) + << " local: " << get_address_port_local() + << " remote: " << its_remote_address << ":" << std::dec << its_remote_port; + // ensure to send back a message w/ wrong protocol version + its_host->on_message(&recv_buffer_[i], + VSOMEIP_SOMEIP_HEADER_SIZE + 8, this, + _destination, + VSOMEIP_ROUTING_CLIENT, + its_remote_address, + its_remote_port); + } else if (!utility::is_valid_message_type(static_cast<message_type_e>( + recv_buffer_[i + VSOMEIP_MESSAGE_TYPE_POS]))) { + VSOMEIP_ERROR << "use: Invalid message type: 0x" + << std::hex << std::setw(2) << std::setfill('0') + << std::uint32_t(recv_buffer_[i + VSOMEIP_MESSAGE_TYPE_POS]) + << " local: " << get_address_port_local() + << " remote: " << its_remote_address << ":" << std::dec << its_remote_port; + } else if (!utility::is_valid_return_code(static_cast<return_code_e>( + recv_buffer_[i + VSOMEIP_RETURN_CODE_POS]))) { + VSOMEIP_ERROR << "use: Invalid return code: 0x" + << std::hex << std::setw(2) << std::setfill('0') + << std::uint32_t(recv_buffer_[i + VSOMEIP_RETURN_CODE_POS]) + << " local: " << get_address_port_local() + << " remote: " << its_remote_address << ":" << std::dec << its_remote_port; + } + receive(); + return; } remaining_bytes -= current_message_size; service_t its_service = VSOMEIP_BYTES_TO_WORD(recv_buffer_[i + VSOMEIP_SERVICE_POS_MIN], @@ -328,14 +386,16 @@ void udp_server_endpoint_impl::receive_cbk( std::memcpy(&its_client, &recv_buffer_[i + VSOMEIP_CLIENT_POS_MIN], sizeof(client_t)); - session_t its_session; - std::memcpy(&its_session, - &recv_buffer_[i + VSOMEIP_SESSION_POS_MIN], - sizeof(session_t)); - clients_mutex_.lock(); - clients_[its_client][its_session] = remote_; - endpoint_to_client_[remote_] = its_client; - clients_mutex_.unlock(); + if (its_client != MAGIC_COOKIE_NETWORK_BYTE_ORDER) { + session_t its_session; + std::memcpy(&its_session, + &recv_buffer_[i + VSOMEIP_SESSION_POS_MIN], + sizeof(session_t)); + clients_mutex_.lock(); + clients_[its_client][its_session] = remote_; + endpoint_to_client_[remote_] = its_client; + clients_mutex_.unlock(); + } } else if (its_service != VSOMEIP_SD_SERVICE && utility::is_notification(recv_buffer_[i + VSOMEIP_MESSAGE_TYPE_POS]) && joined_group_) { @@ -424,4 +484,20 @@ std::string udp_server_endpoint_impl::get_remote_information( + std::to_string(_queue_iterator->first.port()); } +const std::string udp_server_endpoint_impl::get_address_port_local() const { + std::lock_guard<std::mutex> its_lock(socket_mutex_); + std::string its_address_port; + its_address_port.reserve(21); + boost::system::error_code ec; + if (socket_.is_open()) { + endpoint_type its_local_endpoint = socket_.local_endpoint(ec); + if (!ec) { + its_address_port += its_local_endpoint.address().to_string(ec); + its_address_port += ":"; + its_address_port += std::to_string(its_local_endpoint.port()); + } + } + return its_address_port; +} + } // namespace vsomeip diff --git a/implementation/endpoints/src/virtual_server_endpoint_impl.cpp b/implementation/endpoints/src/virtual_server_endpoint_impl.cpp index 30862df..20d40d7 100644 --- a/implementation/endpoints/src/virtual_server_endpoint_impl.cpp +++ b/implementation/endpoints/src/virtual_server_endpoint_impl.cpp @@ -24,10 +24,14 @@ void virtual_server_endpoint_impl::start() { void virtual_server_endpoint_impl::stop() { } -bool virtual_server_endpoint_impl::is_connected() const { +bool virtual_server_endpoint_impl::is_established() const { return false; } +void virtual_server_endpoint_impl::set_established(bool _established) { + (void) _established; +} + void virtual_server_endpoint_impl::set_connected(bool _connected) { (void) _connected; } |