summaryrefslogtreecommitdiff
path: root/implementation/endpoints/src/client_endpoint_impl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'implementation/endpoints/src/client_endpoint_impl.cpp')
-rw-r--r--implementation/endpoints/src/client_endpoint_impl.cpp67
1 files changed, 43 insertions, 24 deletions
diff --git a/implementation/endpoints/src/client_endpoint_impl.cpp b/implementation/endpoints/src/client_endpoint_impl.cpp
index 5c2e399..6ed812a 100644
--- a/implementation/endpoints/src/client_endpoint_impl.cpp
+++ b/implementation/endpoints/src/client_endpoint_impl.cpp
@@ -6,6 +6,7 @@
#include <chrono>
#include <iomanip>
#include <sstream>
+#include <thread>
#include <boost/asio/buffer.hpp>
#include <boost/asio/ip/tcp.hpp>
@@ -13,11 +14,11 @@
#include <boost/asio/local/stream_protocol.hpp>
#include <vsomeip/defines.hpp>
-#include <vsomeip/logger.hpp>
#include "../include/client_endpoint_impl.hpp"
#include "../include/endpoint_host.hpp"
#include "../../configuration/include/internal.hpp"
+#include "../../logging/include/logger.hpp"
#include "../../utility/include/utility.hpp"
namespace vsomeip {
@@ -25,12 +26,13 @@ namespace vsomeip {
template<typename Protocol, int MaxBufferSize>
client_endpoint_impl<Protocol, MaxBufferSize>::client_endpoint_impl(
std::shared_ptr<endpoint_host> _host, endpoint_type _remote,
- boost::asio::io_service &_io)
- : endpoint_impl<MaxBufferSize>(_host, _io), socket_(_io),
- connect_timer_(_io), flush_timer_(_io), remote_(_remote),
- packetizer_(std::make_shared<message_buffer_t>()),
+ boost::asio::io_service &_io, std::uint32_t _max_message_size)
+ : endpoint_impl<MaxBufferSize>(_host, _io, _max_message_size),
+ socket_(_io), remote_(_remote),
+ flush_timer_(_io), connect_timer_(_io),
connect_timeout_(VSOMEIP_DEFAULT_CONNECT_TIMEOUT), // TODO: use config variable
- is_connected_(false) {
+ is_connected_(false),
+ packetizer_(std::make_shared<message_buffer_t>()) {
}
template<typename Protocol, int MaxBufferSize>
@@ -63,6 +65,11 @@ template<typename Protocol, int MaxBufferSize>
bool client_endpoint_impl<Protocol, MaxBufferSize>::send_to(
const std::shared_ptr<endpoint_definition> _target, const byte_t *_data,
uint32_t _size, bool _flush) {
+ (void)_target;
+ (void)_data;
+ (void)_size;
+ (void)_flush;
+
VSOMEIP_ERROR<< "Clients endpoints must not be used to "
<< "send to explicitely specified targets";
return false;
@@ -72,6 +79,7 @@ template<typename Protocol, int MaxBufferSize>
bool client_endpoint_impl<Protocol, MaxBufferSize>::send(const uint8_t *_data,
uint32_t _size, bool _flush) {
std::lock_guard<std::mutex> its_lock(mutex_);
+ bool is_flushing(false);
#if 0
std::stringstream msg;
msg << "cei::send: ";
@@ -80,8 +88,10 @@ bool client_endpoint_impl<Protocol, MaxBufferSize>::send(const uint8_t *_data,
<< (int)_data[i] << " ";
VSOMEIP_DEBUG << msg.str();
#endif
- if (packetizer_->size() + _size > MaxBufferSize) {
- send_queued(packetizer_);
+
+ if (packetizer_->size() + _size > endpoint_impl<MaxBufferSize>::max_message_size_) {
+ queue_.push_back(packetizer_);
+ is_flushing = true;
packetizer_ = std::make_shared<message_buffer_t>();
}
@@ -89,15 +99,22 @@ bool client_endpoint_impl<Protocol, MaxBufferSize>::send(const uint8_t *_data,
if (_flush) {
flush_timer_.cancel();
- send_queued(packetizer_);
+ queue_.push_back(packetizer_);
+ is_flushing = true;
packetizer_ = std::make_shared<message_buffer_t>();
} else {
flush_timer_.expires_from_now(
std::chrono::milliseconds(VSOMEIP_DEFAULT_FLUSH_TIMEOUT)); // TODO: use config variable
flush_timer_.async_wait(
- std::bind(
- &client_endpoint_impl<Protocol, MaxBufferSize>::flush_cbk,
- this->shared_from_this(), std::placeholders::_1));
+ std::bind(
+ &client_endpoint_impl<
+ Protocol, MaxBufferSize>::flush_cbk,
+ this->shared_from_this(),
+ std::placeholders::_1));
+ }
+
+ if (is_flushing && queue_.size() == 1) { // no writing in progress
+ send_queued();
}
return (true);
@@ -108,8 +125,12 @@ bool client_endpoint_impl<Protocol, MaxBufferSize>::flush() {
bool is_successful(true);
if (!packetizer_->empty()) {
- send_queued(packetizer_);
+ std::lock_guard<std::mutex> its_lock(mutex_);
+ queue_.push_back(packetizer_);
packetizer_ = std::make_shared<message_buffer_t>();
+ if (queue_.size() == 1) { // no writing in progress
+ send_queued();
+ }
} else {
is_successful = false;
}
@@ -163,17 +184,15 @@ void client_endpoint_impl<Protocol, MaxBufferSize>::wait_connect_cbk(
template<typename Protocol, int MaxBufferSize>
void client_endpoint_impl<Protocol, MaxBufferSize>::send_cbk(
- message_buffer_ptr_t _buffer, boost::system::error_code const &_error,
- std::size_t _bytes) {
-#if 0
- std::stringstream msg;
- msg << "cei<" << this << ">::scb (" << _error.message() << "): ";
- for (std::size_t i = 0; i < _data->size(); ++i)
- msg << std::hex << std::setw(2) << std::setfill('0')
- << (int)(*_buffer)[i] << " ";
- VSOMEIP_DEBUG << msg.str();
-#endif
- if (_error == boost::asio::error::broken_pipe) {
+ boost::system::error_code const &_error, std::size_t _bytes) {
+ (void)_bytes;
+ if (!_error) {
+ std::lock_guard<std::mutex> its_lock(mutex_);
+ queue_.pop_front();
+ if (queue_.size() > 0) {
+ send_queued();
+ }
+ } else if (_error == boost::asio::error::broken_pipe) {
is_connected_ = false;
socket_.close();
connect();