diff options
Diffstat (limited to 'implementation/endpoints/src/tcp_server_endpoint_impl.cpp')
-rw-r--r-- | implementation/endpoints/src/tcp_server_endpoint_impl.cpp | 124 |
1 files changed, 98 insertions, 26 deletions
diff --git a/implementation/endpoints/src/tcp_server_endpoint_impl.cpp b/implementation/endpoints/src/tcp_server_endpoint_impl.cpp index 0084d8e..fc31850 100644 --- a/implementation/endpoints/src/tcp_server_endpoint_impl.cpp +++ b/implementation/endpoints/src/tcp_server_endpoint_impl.cpp @@ -1,4 +1,4 @@ -// Copyright (C) 2014-2015 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) +// Copyright (C) 2014-2016 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) // This Source Code Form is subject to the terms of the Mozilla Public // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. @@ -46,28 +46,61 @@ void tcp_server_endpoint_impl::start() { } void tcp_server_endpoint_impl::stop() { + server_endpoint_impl::stop(); for (auto& i : connections_) i.second->stop(); - acceptor_.close(); + if(acceptor_.is_open()) { + boost::system::error_code its_error; + acceptor_.close(its_error); + } } bool tcp_server_endpoint_impl::send_to( const std::shared_ptr<endpoint_definition> _target, const byte_t *_data, uint32_t _size, bool _flush) { + std::lock_guard<std::mutex> its_lock(mutex_); endpoint_type its_target(_target->get_address(), _target->get_port()); return send_intern(its_target, _data, _size, _flush); } void tcp_server_endpoint_impl::send_queued(queue_iterator_type _queue_iterator) { auto connection_iterator = connections_.find(_queue_iterator->first); - if (connection_iterator != connections_.end()) + if (connection_iterator != connections_.end()) { connection_iterator->second->send_queued(_queue_iterator); + } else { + VSOMEIP_DEBUG << "Didn't find connection: " + << _queue_iterator->first.address().to_string() << ":" << std::dec + << static_cast<std::uint16_t>(_queue_iterator->first.port()) + << " dropping message."; + _queue_iterator->second.pop_front(); + } +} + +bool tcp_server_endpoint_impl::is_established(std::shared_ptr<endpoint_definition> _endpoint) { + bool is_connected = false; + endpoint_type endpoint(_endpoint->get_address(), _endpoint->get_port()); + auto connection_iterator = connections_.find(endpoint); + if (connection_iterator != connections_.end()) { +#if 0 + VSOMEIP_DEBUG << "tcp_server_endpoint_impl::is_established(): subscribers TCP connection for " + << endpoint.address().to_string() << ":" << std::dec + << static_cast<std::uint16_t>(endpoint.port()) + << " is established!" ; +#endif + is_connected = true; + } else { + VSOMEIP_DEBUG << "Didn't find TCP connection: Subscription rejected for: " + << endpoint.address().to_string() << ":" << std::dec + << static_cast<std::uint16_t>(endpoint.port()); + } + return is_connected; } tcp_server_endpoint_impl::endpoint_type tcp_server_endpoint_impl::get_remote() const { - return current_->get_socket().remote_endpoint(); + boost::system::error_code its_error; + return current_->get_socket().remote_endpoint(its_error); } bool tcp_server_endpoint_impl::get_remote_address( @@ -77,9 +110,7 @@ bool tcp_server_endpoint_impl::get_remote_address( boost::system::error_code its_error; tcp_server_endpoint_impl::endpoint_type its_endpoint = current_->get_socket().remote_endpoint(its_error); - if (its_error) { - return false; - } else { + if (!its_error) { boost::asio::ip::address its_address = its_endpoint.address(); if (!its_address.is_unspecified()) { _address = its_address; @@ -90,7 +121,19 @@ bool tcp_server_endpoint_impl::get_remote_address( return false; } -bool tcp_server_endpoint_impl::get_multicast(service_t, event_t, +unsigned short tcp_server_endpoint_impl::get_remote_port() const { + if (current_) { + boost::system::error_code its_error; + tcp_server_endpoint_impl::endpoint_type its_endpoint = + current_->get_socket().remote_endpoint(its_error); + if (!its_error) { + return its_endpoint.port(); + } + } + return 0; +} + +bool tcp_server_endpoint_impl::get_default_target(service_t, tcp_server_endpoint_impl::endpoint_type &) const { return false; } @@ -100,17 +143,23 @@ void tcp_server_endpoint_impl::accept_cbk(connection::ptr _connection, if (!_error) { socket_type &new_connection_socket = _connection->get_socket(); - endpoint_type remote = new_connection_socket.remote_endpoint(); - - connections_[remote] = _connection; - _connection->start(); - + boost::system::error_code its_error; + endpoint_type remote = new_connection_socket.remote_endpoint(its_error); + if(!its_error) { + connections_[remote] = _connection; + _connection->start(); + } + } + if (_error != boost::asio::error::operation_aborted) { start(); + } else { + VSOMEIP_DEBUG << "Endpoint was stopped, don't starting again"; } } unsigned short tcp_server_endpoint_impl::get_local_port() const { - return acceptor_.local_endpoint().port(); + boost::system::error_code its_error; + return acceptor_.local_endpoint(its_error).port(); } bool tcp_server_endpoint_impl::is_reliable() const { @@ -163,8 +212,10 @@ void tcp_server_endpoint_impl::connection::receive() { void tcp_server_endpoint_impl::connection::stop() { std::lock_guard<std::mutex> its_lock(stop_mutex_); if(socket_.is_open()) { - socket_.shutdown(socket_.shutdown_both); - socket_.close(); + boost::system::error_code its_shutdown_error; + socket_.shutdown(socket_.shutdown_both, its_shutdown_error); + boost::system::error_code its_close_error; + socket_.close(its_close_error); } } @@ -172,8 +223,9 @@ void tcp_server_endpoint_impl::connection::send_queued( queue_iterator_type _queue_iterator) { message_buffer_ptr_t its_buffer = _queue_iterator->second.front(); - if (server_->has_enabled_magic_cookies_) + if (server_->has_enabled_magic_cookies_) { send_magic_cookie(its_buffer); + } boost::asio::async_write(socket_, boost::asio::buffer(*its_buffer), std::bind(&tcp_server_endpoint_base_impl::send_cbk, @@ -240,20 +292,27 @@ void tcp_server_endpoint_impl::connection::receive_cbk( } } if (needs_forwarding) { - if (utility::is_request(recv_buffer_[VSOMEIP_MESSAGE_TYPE_POS])) { + if (utility::is_request( + recv_buffer_[its_iteration_gap + + VSOMEIP_MESSAGE_TYPE_POS])) { client_t its_client; std::memcpy(&its_client, - &recv_buffer_[VSOMEIP_CLIENT_POS_MIN], + &recv_buffer_[its_iteration_gap + VSOMEIP_CLIENT_POS_MIN], sizeof(client_t)); session_t its_session; std::memcpy(&its_session, - &recv_buffer_[VSOMEIP_SESSION_POS_MIN], + &recv_buffer_[its_iteration_gap + VSOMEIP_SESSION_POS_MIN], sizeof(session_t)); { std::lock_guard<std::mutex> its_lock(stop_mutex_); if (socket_.is_open()) { - server_->clients_[its_client][its_session] = - socket_.remote_endpoint(); + server_->clients_mutex_.lock(); + boost::system::error_code its_error; + endpoint_type its_endpoint(socket_.remote_endpoint(its_error)); + if (!its_error) { + server_->clients_[its_client][its_session] = its_endpoint; + } + server_->clients_mutex_.unlock(); server_->current_ = this; } } @@ -290,10 +349,22 @@ void tcp_server_endpoint_impl::connection::receive_cbk( } } } else if (current_message_size > max_message_size_) { - VSOMEIP_ERROR << "Message exceeds maximum message size (" - << std::dec << current_message_size - << "). Resetting receiver."; - recv_buffer_size_ = 0; + if (server_->has_enabled_magic_cookies_) { + VSOMEIP_ERROR << "Received a TCP message which exceeds " + << "maximum message size (" + << std::dec << current_message_size + << "). Magic Cookies are enabled: " + << "Resetting receiver."; + recv_buffer_size_ = 0; + } else { + VSOMEIP_ERROR << "Received a TCP message which exceeds " + << "maximum message size (" + << std::dec << current_message_size + << ") Magic cookies are disabled: " + << "Connection will be disabled!"; + recv_buffer_size_ = 0; + return; + } } } while (has_full_message && recv_buffer_size_); if (its_iteration_gap) { @@ -317,6 +388,7 @@ client_t tcp_server_endpoint_impl::get_client(std::shared_ptr<endpoint_definitio } client_t tcp_server_endpoint_impl::connection::get_client(endpoint_type _endpoint_type) { + std::lock_guard<std::mutex> its_lock(server_->clients_mutex_); for (auto its_client : server_->clients_) { for (auto its_session : server_->clients_[its_client.first]) { auto endpoint = its_session.second; |