summaryrefslogtreecommitdiff
path: root/implementation/endpoints/src/local_tcp_client_endpoint_impl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'implementation/endpoints/src/local_tcp_client_endpoint_impl.cpp')
-rw-r--r--implementation/endpoints/src/local_tcp_client_endpoint_impl.cpp382
1 files changed, 382 insertions, 0 deletions
diff --git a/implementation/endpoints/src/local_tcp_client_endpoint_impl.cpp b/implementation/endpoints/src/local_tcp_client_endpoint_impl.cpp
new file mode 100644
index 0000000..e728827
--- /dev/null
+++ b/implementation/endpoints/src/local_tcp_client_endpoint_impl.cpp
@@ -0,0 +1,382 @@
+// Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+#include <iomanip>
+#include <sstream>
+
+#include <boost/asio/write.hpp>
+
+#include <vsomeip/defines.hpp>
+#include <vsomeip/internal/logger.hpp>
+
+#include "../include/endpoint_host.hpp"
+#include "../include/local_tcp_client_endpoint_impl.hpp"
+#include "../include/local_tcp_server_endpoint_impl.hpp"
+#include "../../protocol/include/protocol.hpp"
+#include "../../routing/include/routing_host.hpp"
+
+namespace vsomeip_v3 {
+
+local_tcp_client_endpoint_impl::local_tcp_client_endpoint_impl(
+ const std::shared_ptr<endpoint_host> &_endpoint_host,
+ const std::shared_ptr<routing_host> &_routing_host,
+ const endpoint_type & _local,
+ const endpoint_type &_remote,
+ boost::asio::io_context &_io,
+ const std::shared_ptr<configuration> &_configuration)
+ : local_tcp_client_endpoint_base_impl(_endpoint_host, _routing_host,
+ _local, _remote, _io,
+ _configuration->get_max_message_size_local(),
+ _configuration->get_endpoint_queue_limit_local(),
+ _configuration),
+ recv_buffer_(VSOMEIP_LOCAL_CLIENT_ENDPOINT_RECV_BUFFER_SIZE, 0) {
+
+ is_supporting_magic_cookies_ = false;
+}
+
+local_tcp_client_endpoint_impl::~local_tcp_client_endpoint_impl() {
+
+}
+
+bool local_tcp_client_endpoint_impl::is_local() const {
+
+ return true;
+}
+
+void local_tcp_client_endpoint_impl::restart(bool _force) {
+
+ if (!_force && state_ == cei_state_e::CONNECTING) {
+ return;
+ }
+ state_ = cei_state_e::CONNECTING;
+ {
+ std::lock_guard<std::mutex> its_lock(mutex_);
+ sending_blocked_ = false;
+ queue_.clear();
+ queue_size_ = 0;
+ }
+ {
+ std::lock_guard<std::mutex> its_lock(socket_mutex_);
+ shutdown_and_close_socket_unlocked(true);
+ }
+ was_not_connected_ = true;
+ reconnect_counter_ = 0;
+ start_connect_timer();
+}
+
+void local_tcp_client_endpoint_impl::start() {
+
+ {
+ std::lock_guard<std::mutex> its_lock(mutex_);
+ sending_blocked_ = false;
+ }
+ connect();
+}
+
+void local_tcp_client_endpoint_impl::stop() {
+ {
+ std::lock_guard<std::mutex> its_lock(mutex_);
+ sending_blocked_ = true;
+ }
+ {
+ std::lock_guard<std::mutex> its_lock(connect_timer_mutex_);
+ boost::system::error_code ec;
+ connect_timer_.cancel(ec);
+ }
+ connect_timeout_ = VSOMEIP_DEFAULT_CONNECT_TIMEOUT;
+
+ bool is_open(false);
+ {
+ std::lock_guard<std::mutex> its_lock(socket_mutex_);
+ is_open = socket_->is_open();
+ }
+ if (is_open) {
+ bool send_queue_empty(false);
+ std::uint32_t times_slept(0);
+
+ while (times_slept <= 50) {
+ mutex_.lock();
+ send_queue_empty = (queue_.size() == 0);
+ mutex_.unlock();
+ if (send_queue_empty) {
+ break;
+ } else {
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ times_slept++;
+ }
+ }
+ }
+ shutdown_and_close_socket(false);
+}
+
+void local_tcp_client_endpoint_impl::connect() {
+ start_connecting_timer();
+ boost::system::error_code its_connect_error;
+ {
+ std::lock_guard<std::mutex> its_lock(socket_mutex_);
+ boost::system::error_code its_error;
+ socket_->open(remote_.protocol(), its_error);
+
+ if (!its_error || its_error == boost::asio::error::already_open) {
+ socket_->set_option(boost::asio::socket_base::reuse_address(true), its_error);
+ if (its_error) {
+ VSOMEIP_WARNING << "local_tcp_client_endpoint_impl::" << __func__
+ << ": Cannot enable SO_REUSEADDR"
+ << "(" << its_error.message() << ")";
+ }
+
+ socket_->bind(local_, its_error);
+ if (its_error) {
+ VSOMEIP_WARNING << "local_tcp_client_endpoint_impl::" << __func__
+ << ": Cannot bind to client port " << local_.port()
+ << "(" << its_error.message() << ")";
+ }
+
+ state_ = cei_state_e::CONNECTING;
+ socket_->connect(remote_, its_connect_error);
+ } else {
+ VSOMEIP_WARNING << "local_client_endpoint::connect: Error opening socket: "
+ << its_error.message() << " (" << std::dec << its_error.value()
+ << ")";
+ its_connect_error = its_error;
+ }
+ }
+ // call connect_cbk asynchronously
+ try {
+ strand_.post(
+ std::bind(&client_endpoint_impl::connect_cbk, shared_from_this(),
+ its_connect_error));
+ } catch (const std::exception &e) {
+ VSOMEIP_ERROR << "local_client_endpoint_impl::connect: " << e.what();
+ }
+}
+
+void local_tcp_client_endpoint_impl::receive() {
+ std::lock_guard<std::mutex> its_lock(socket_mutex_);
+ if (socket_->is_open()) {
+ socket_->async_receive(
+ boost::asio::buffer(recv_buffer_),
+ strand_.wrap(
+ std::bind(
+ &local_tcp_client_endpoint_impl::receive_cbk,
+ std::dynamic_pointer_cast<
+ local_tcp_client_endpoint_impl
+ >(shared_from_this()),
+ std::placeholders::_1,
+ std::placeholders::_2
+ )
+ )
+ );
+ }
+}
+
+// this overrides client_endpoint_impl::send to disable the pull method
+// for local communication
+bool local_tcp_client_endpoint_impl::send(const uint8_t *_data, uint32_t _size) {
+ std::lock_guard<std::mutex> its_lock(mutex_);
+ bool ret(true);
+ const bool queue_size_zero_on_entry(queue_.empty());
+ if (endpoint_impl::sending_blocked_ ||
+ check_message_size(nullptr, _size) != cms_ret_e::MSG_OK ||
+ !check_packetizer_space(_size) ||
+ !check_queue_limit(_data, _size)) {
+ ret = false;
+ } else {
+#if 0
+ std::stringstream msg;
+ msg << "lce::send: ";
+ for (uint32_t i = 0; i < _size; i++)
+ msg << std::hex << std::setw(2) << std::setfill('0')
+ << (int)_data[i] << " ";
+ VSOMEIP_INFO << msg.str();
+#endif
+ train_->buffer_->insert(train_->buffer_->end(), _data, _data + _size);
+ queue_train(train_, queue_size_zero_on_entry);
+ train_->buffer_ = std::make_shared<message_buffer_t>();
+ }
+ return ret;
+}
+
+void local_tcp_client_endpoint_impl::send_queued(std::pair<message_buffer_ptr_t, uint32_t> &_entry) {
+
+ static const byte_t its_start_tag[] = { 0x67, 0x37, 0x6D, 0x07 };
+ static const byte_t its_end_tag[] = { 0x07, 0x6D, 0x37, 0x67 };
+ std::vector<boost::asio::const_buffer> bufs;
+
+ bufs.push_back(boost::asio::buffer(its_start_tag));
+ bufs.push_back(boost::asio::buffer(*_entry.first));
+ bufs.push_back(boost::asio::buffer(its_end_tag));
+
+ {
+ std::lock_guard<std::mutex> its_lock(socket_mutex_);
+ boost::asio::async_write(
+ *socket_,
+ bufs,
+ std::bind(
+ &client_endpoint_impl::send_cbk,
+ std::dynamic_pointer_cast<
+ local_tcp_client_endpoint_impl
+ >(shared_from_this()),
+ std::placeholders::_1,
+ std::placeholders::_2,
+ _entry.first
+ )
+ );
+ }
+}
+
+void local_tcp_client_endpoint_impl::get_configured_times_from_endpoint(
+ service_t _service, method_t _method,
+ std::chrono::nanoseconds *_debouncing,
+ std::chrono::nanoseconds *_maximum_retention) const {
+
+ (void)_service;
+ (void)_method;
+ (void)_debouncing;
+ (void)_maximum_retention;
+ VSOMEIP_ERROR << "local_client_endpoint_impl::get_configured_times_from_endpoint called.";
+}
+
+void local_tcp_client_endpoint_impl::send_magic_cookie() {
+
+}
+
+void local_tcp_client_endpoint_impl::receive_cbk(
+ boost::system::error_code const &_error, std::size_t _bytes) {
+
+ if (_error) {
+ if (_error == boost::asio::error::operation_aborted) {
+ // endpoint was stopped
+ return;
+ } else if (_error == boost::asio::error::connection_reset
+ || _error == boost::asio::error::eof
+ || _error == boost::asio::error::bad_descriptor) {
+ VSOMEIP_INFO << __func__ << " local_tcp_client_endpoint:"
+ " connection_reset/EOF/bad_descriptor";
+ } else if (_error) {
+ VSOMEIP_ERROR << "Local endpoint received message ("
+ << _error.message() << ")";
+ }
+ error_handler_t handler;
+ {
+ std::lock_guard<std::mutex> its_lock(error_handler_mutex_);
+ handler = error_handler_;
+ }
+ if (handler)
+ handler();
+ } else {
+
+#if 0
+ std::stringstream msg;
+ msg << "lce<" << this << ">::recv: ";
+ for (std::size_t i = 0; i < recv_buffer_.size(); i++)
+ msg << std::setw(2) << std::setfill('0') << std::hex
+ << (int)recv_buffer_[i] << " ";
+ VSOMEIP_INFO << msg.str();
+#endif
+
+ // We only handle a single message here. Check whether the message
+ // format matches what we do expect.
+ // TODO: Replace the magic numbers.
+ if (_bytes == VSOMEIP_LOCAL_CLIENT_ENDPOINT_RECV_BUFFER_SIZE
+ && recv_buffer_[0] == 0x67 && recv_buffer_[1] == 0x37
+ && recv_buffer_[2] == 0x6d && recv_buffer_[3] == 0x07
+ && recv_buffer_[4] == byte_t(protocol::id_e::ASSIGN_CLIENT_ACK_ID)
+ && recv_buffer_[15] == 0x07 && recv_buffer_[16] == 0x6d
+ && recv_buffer_[17] == 0x37 && recv_buffer_[18] == 0x67) {
+
+ auto its_routing_host = routing_host_.lock();
+ if (its_routing_host)
+ its_routing_host->on_message(&recv_buffer_[4],
+ static_cast<length_t>(recv_buffer_.size() - 8), this);
+ }
+
+ receive();
+ }
+}
+
+bool local_tcp_client_endpoint_impl::get_remote_address(
+ boost::asio::ip::address &_address) const {
+ (void)_address;
+ return false;
+}
+
+std::uint16_t local_tcp_client_endpoint_impl::get_remote_port() const {
+ return 0;
+}
+
+void local_tcp_client_endpoint_impl::set_local_port() {
+ // local_port_ is set to zero in ctor of client_endpoint_impl -> do nothing
+}
+
+void local_tcp_client_endpoint_impl::print_status() {
+
+ std::string its_path("");
+ std::size_t its_data_size(0);
+ std::size_t its_queue_size(0);
+ {
+ std::lock_guard<std::mutex> its_lock(mutex_);
+ its_queue_size = queue_.size();
+ its_data_size = queue_size_;
+ }
+
+ VSOMEIP_INFO << "status lce: " << its_path << " queue: "
+ << its_queue_size << " data: " << its_data_size;
+}
+
+std::string local_tcp_client_endpoint_impl::get_remote_information() const {
+
+ boost::system::error_code ec;
+ return remote_.address().to_string(ec) + ":"
+ + std::to_string(remote_.port());
+}
+
+
+bool local_tcp_client_endpoint_impl::check_packetizer_space(std::uint32_t _size) {
+ if (train_->buffer_->size() + _size < train_->buffer_->size()) {
+ VSOMEIP_ERROR << "Overflow in packetizer addition ~> abort sending!";
+ return false;
+ }
+ if (train_->buffer_->size() + _size > max_message_size_
+ && !train_->buffer_->empty()) {
+ queue_.push_back(std::make_pair(train_->buffer_, 0));
+ queue_size_ += train_->buffer_->size();
+ train_->buffer_ = std::make_shared<message_buffer_t>();
+ }
+ return true;
+}
+
+bool local_tcp_client_endpoint_impl::is_reliable() const {
+
+ return true;
+}
+
+std::uint32_t local_tcp_client_endpoint_impl::get_max_allowed_reconnects() const {
+
+ return (MAX_RECONNECTS_UNLIMITED);
+}
+
+bool local_tcp_client_endpoint_impl::tp_segmentation_enabled(
+ service_t _service, method_t _method) const {
+
+ (void)_service;
+ (void)_method;
+ return false;
+}
+
+void local_tcp_client_endpoint_impl::max_allowed_reconnects_reached() {
+
+ VSOMEIP_ERROR << "local_client_endpoint::max_allowed_reconnects_reached: "
+ << get_remote_information();
+ error_handler_t handler;
+ {
+ std::lock_guard<std::mutex> its_lock(error_handler_mutex_);
+ handler = error_handler_;
+ }
+ if (handler)
+ handler();
+}
+
+} // namespace vsomeip_v3