// Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) // This Source Code Form is subject to the terms of the Mozilla Public // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. #include #include #include #include #include #include "../include/endpoint_host.hpp" #include "../include/local_tcp_client_endpoint_impl.hpp" #include "../include/local_tcp_server_endpoint_impl.hpp" #include "../../protocol/include/protocol.hpp" #include "../../routing/include/routing_host.hpp" namespace vsomeip_v3 { local_tcp_client_endpoint_impl::local_tcp_client_endpoint_impl( const std::shared_ptr &_endpoint_host, const std::shared_ptr &_routing_host, const endpoint_type & _local, const endpoint_type &_remote, boost::asio::io_context &_io, const std::shared_ptr &_configuration) : local_tcp_client_endpoint_base_impl(_endpoint_host, _routing_host, _local, _remote, _io, _configuration->get_max_message_size_local(), _configuration->get_endpoint_queue_limit_local(), _configuration), recv_buffer_(VSOMEIP_LOCAL_CLIENT_ENDPOINT_RECV_BUFFER_SIZE, 0) { is_supporting_magic_cookies_ = false; } local_tcp_client_endpoint_impl::~local_tcp_client_endpoint_impl() { } bool local_tcp_client_endpoint_impl::is_local() const { return true; } void local_tcp_client_endpoint_impl::restart(bool _force) { if (!_force && state_ == cei_state_e::CONNECTING) { return; } state_ = cei_state_e::CONNECTING; { std::lock_guard its_lock(mutex_); sending_blocked_ = false; queue_.clear(); queue_size_ = 0; } { std::lock_guard its_lock(socket_mutex_); shutdown_and_close_socket_unlocked(true); } was_not_connected_ = true; reconnect_counter_ = 0; start_connect_timer(); } void local_tcp_client_endpoint_impl::start() { { std::lock_guard its_lock(mutex_); sending_blocked_ = false; } connect(); } void local_tcp_client_endpoint_impl::stop() { { std::lock_guard its_lock(mutex_); sending_blocked_ = true; } { std::lock_guard its_lock(connect_timer_mutex_); boost::system::error_code ec; connect_timer_.cancel(ec); } connect_timeout_ = VSOMEIP_DEFAULT_CONNECT_TIMEOUT; bool is_open(false); { std::lock_guard its_lock(socket_mutex_); is_open = socket_->is_open(); } if (is_open) { bool send_queue_empty(false); std::uint32_t times_slept(0); while (times_slept <= 50) { mutex_.lock(); send_queue_empty = (queue_.size() == 0); mutex_.unlock(); if (send_queue_empty) { break; } else { std::this_thread::sleep_for(std::chrono::milliseconds(10)); times_slept++; } } } shutdown_and_close_socket(false); } void local_tcp_client_endpoint_impl::connect() { start_connecting_timer(); boost::system::error_code its_connect_error; { std::lock_guard its_lock(socket_mutex_); boost::system::error_code its_error; socket_->open(remote_.protocol(), its_error); if (!its_error || its_error == boost::asio::error::already_open) { socket_->set_option(boost::asio::socket_base::reuse_address(true), its_error); if (its_error) { VSOMEIP_WARNING << "local_tcp_client_endpoint_impl::" << __func__ << ": Cannot enable SO_REUSEADDR" << "(" << its_error.message() << ")"; } socket_->bind(local_, its_error); if (its_error) { VSOMEIP_WARNING << "local_tcp_client_endpoint_impl::" << __func__ << ": Cannot bind to client port " << local_.port() << "(" << its_error.message() << ")"; } state_ = cei_state_e::CONNECTING; socket_->connect(remote_, its_connect_error); } else { VSOMEIP_WARNING << "local_client_endpoint::connect: Error opening socket: " << its_error.message() << " (" << std::dec << its_error.value() << ")"; its_connect_error = its_error; } } // call connect_cbk asynchronously try { strand_.post( std::bind(&client_endpoint_impl::connect_cbk, shared_from_this(), its_connect_error)); } catch (const std::exception &e) { VSOMEIP_ERROR << "local_client_endpoint_impl::connect: " << e.what(); } } void local_tcp_client_endpoint_impl::receive() { std::lock_guard its_lock(socket_mutex_); if (socket_->is_open()) { socket_->async_receive( boost::asio::buffer(recv_buffer_), strand_.wrap( std::bind( &local_tcp_client_endpoint_impl::receive_cbk, std::dynamic_pointer_cast< local_tcp_client_endpoint_impl >(shared_from_this()), std::placeholders::_1, std::placeholders::_2 ) ) ); } } // this overrides client_endpoint_impl::send to disable the pull method // for local communication bool local_tcp_client_endpoint_impl::send(const uint8_t *_data, uint32_t _size) { std::lock_guard its_lock(mutex_); bool ret(true); const bool queue_size_zero_on_entry(queue_.empty()); if (endpoint_impl::sending_blocked_ || check_message_size(nullptr, _size) != cms_ret_e::MSG_OK || !check_packetizer_space(_size) || !check_queue_limit(_data, _size)) { ret = false; } else { #if 0 std::stringstream msg; msg << "lce::send: "; for (uint32_t i = 0; i < _size; i++) msg << std::hex << std::setw(2) << std::setfill('0') << (int)_data[i] << " "; VSOMEIP_INFO << msg.str(); #endif train_->buffer_->insert(train_->buffer_->end(), _data, _data + _size); queue_train(train_, queue_size_zero_on_entry); train_->buffer_ = std::make_shared(); } return ret; } void local_tcp_client_endpoint_impl::send_queued(std::pair &_entry) { static const byte_t its_start_tag[] = { 0x67, 0x37, 0x6D, 0x07 }; static const byte_t its_end_tag[] = { 0x07, 0x6D, 0x37, 0x67 }; std::vector bufs; bufs.push_back(boost::asio::buffer(its_start_tag)); bufs.push_back(boost::asio::buffer(*_entry.first)); bufs.push_back(boost::asio::buffer(its_end_tag)); { std::lock_guard its_lock(socket_mutex_); boost::asio::async_write( *socket_, bufs, std::bind( &client_endpoint_impl::send_cbk, std::dynamic_pointer_cast< local_tcp_client_endpoint_impl >(shared_from_this()), std::placeholders::_1, std::placeholders::_2, _entry.first ) ); } } void local_tcp_client_endpoint_impl::get_configured_times_from_endpoint( service_t _service, method_t _method, std::chrono::nanoseconds *_debouncing, std::chrono::nanoseconds *_maximum_retention) const { (void)_service; (void)_method; (void)_debouncing; (void)_maximum_retention; VSOMEIP_ERROR << "local_client_endpoint_impl::get_configured_times_from_endpoint called."; } void local_tcp_client_endpoint_impl::send_magic_cookie() { } void local_tcp_client_endpoint_impl::receive_cbk( boost::system::error_code const &_error, std::size_t _bytes) { if (_error) { if (_error == boost::asio::error::operation_aborted) { // endpoint was stopped return; } else if (_error == boost::asio::error::connection_reset || _error == boost::asio::error::eof || _error == boost::asio::error::bad_descriptor) { VSOMEIP_INFO << __func__ << " local_tcp_client_endpoint:" " connection_reset/EOF/bad_descriptor"; } else if (_error) { VSOMEIP_ERROR << "Local endpoint received message (" << _error.message() << ")"; } error_handler_t handler; { std::lock_guard its_lock(error_handler_mutex_); handler = error_handler_; } if (handler) handler(); } else { #if 0 std::stringstream msg; msg << "lce<" << this << ">::recv: "; for (std::size_t i = 0; i < recv_buffer_.size(); i++) msg << std::setw(2) << std::setfill('0') << std::hex << (int)recv_buffer_[i] << " "; VSOMEIP_INFO << msg.str(); #endif // We only handle a single message here. Check whether the message // format matches what we do expect. // TODO: Replace the magic numbers. if (_bytes == VSOMEIP_LOCAL_CLIENT_ENDPOINT_RECV_BUFFER_SIZE && recv_buffer_[0] == 0x67 && recv_buffer_[1] == 0x37 && recv_buffer_[2] == 0x6d && recv_buffer_[3] == 0x07 && recv_buffer_[4] == byte_t(protocol::id_e::ASSIGN_CLIENT_ACK_ID) && recv_buffer_[15] == 0x07 && recv_buffer_[16] == 0x6d && recv_buffer_[17] == 0x37 && recv_buffer_[18] == 0x67) { auto its_routing_host = routing_host_.lock(); if (its_routing_host) its_routing_host->on_message(&recv_buffer_[4], static_cast(recv_buffer_.size() - 8), this); } receive(); } } bool local_tcp_client_endpoint_impl::get_remote_address( boost::asio::ip::address &_address) const { (void)_address; return false; } std::uint16_t local_tcp_client_endpoint_impl::get_remote_port() const { return 0; } void local_tcp_client_endpoint_impl::set_local_port() { // local_port_ is set to zero in ctor of client_endpoint_impl -> do nothing } void local_tcp_client_endpoint_impl::print_status() { std::string its_path(""); std::size_t its_data_size(0); std::size_t its_queue_size(0); { std::lock_guard its_lock(mutex_); its_queue_size = queue_.size(); its_data_size = queue_size_; } VSOMEIP_INFO << "status lce: " << its_path << " queue: " << its_queue_size << " data: " << its_data_size; } std::string local_tcp_client_endpoint_impl::get_remote_information() const { boost::system::error_code ec; return remote_.address().to_string(ec) + ":" + std::to_string(remote_.port()); } bool local_tcp_client_endpoint_impl::check_packetizer_space(std::uint32_t _size) { if (train_->buffer_->size() + _size < train_->buffer_->size()) { VSOMEIP_ERROR << "Overflow in packetizer addition ~> abort sending!"; return false; } if (train_->buffer_->size() + _size > max_message_size_ && !train_->buffer_->empty()) { queue_.push_back(std::make_pair(train_->buffer_, 0)); queue_size_ += train_->buffer_->size(); train_->buffer_ = std::make_shared(); } return true; } bool local_tcp_client_endpoint_impl::is_reliable() const { return true; } std::uint32_t local_tcp_client_endpoint_impl::get_max_allowed_reconnects() const { return (MAX_RECONNECTS_UNLIMITED); } bool local_tcp_client_endpoint_impl::tp_segmentation_enabled( service_t _service, method_t _method) const { (void)_service; (void)_method; return false; } void local_tcp_client_endpoint_impl::max_allowed_reconnects_reached() { VSOMEIP_ERROR << "local_client_endpoint::max_allowed_reconnects_reached: " << get_remote_information(); error_handler_t handler; { std::lock_guard its_lock(error_handler_mutex_); handler = error_handler_; } if (handler) handler(); } } // namespace vsomeip_v3