diff options
Diffstat (limited to 'src/components/transport_manager/src/cloud/websocket_client_connection.cc')
-rw-r--r-- | src/components/transport_manager/src/cloud/websocket_client_connection.cc | 358 |
1 files changed, 358 insertions, 0 deletions
diff --git a/src/components/transport_manager/src/cloud/websocket_client_connection.cc b/src/components/transport_manager/src/cloud/websocket_client_connection.cc new file mode 100644 index 0000000000..ec2fb0bcfb --- /dev/null +++ b/src/components/transport_manager/src/cloud/websocket_client_connection.cc @@ -0,0 +1,358 @@ +/* + * + * Copyright (c) 2018, Livio + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following + * disclaimer in the documentation and/or other materials provided with the + * distribution. + * + * Neither the name of the Ford Motor Company nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "transport_manager/cloud/websocket_client_connection.h" +#include "transport_manager/cloud/cloud_device.h" + +#include "transport_manager/transport_adapter/transport_adapter_controller.h" + +#include "utils/logger.h" + +namespace transport_manager { +namespace transport_adapter { +CREATE_LOGGERPTR_GLOBAL(logger_, "TransportManager") + +WebsocketClientConnection::WebsocketClientConnection( + const DeviceUID& device_uid, + const ApplicationHandle& app_handle, + TransportAdapterController* controller) + : controller_(controller) + , resolver_(ioc_) + , ws_(ioc_) +#ifdef ENABLE_SECURITY + , ctx_(ssl::context::sslv23_client) + , wss_(ioc_, ctx_) +#endif // ENABLE_SECURITY + , shutdown_(false) + , thread_delegate_(new LoopThreadDelegate(&message_queue_, this)) + , write_thread_(threads::CreateThread("WS Async Send", thread_delegate_)) + , device_uid_(device_uid) + , app_handle_(app_handle) + , io_pool_(1) { +} + +WebsocketClientConnection::~WebsocketClientConnection() { + ioc_.stop(); + io_pool_.join(); +} + +#ifdef ENABLE_SECURITY +void WebsocketClientConnection::AddCertificateAuthority( + const std::string cert, boost::system::error_code& ec) { + ctx_.add_certificate_authority(boost::asio::buffer(cert.data(), cert.size()), + ec); + if (ec) { + return; + } + + wss_.next_layer().set_verify_mode(ssl::verify_peer); +} +#endif // ENABLE_SECURITY + +TransportAdapter::Error WebsocketClientConnection::Start() { + LOG4CXX_AUTO_TRACE(logger_); + DeviceSptr device = controller_->FindDevice(device_uid_); + CloudDevice* cloud_device = static_cast<CloudDevice*>(device.get()); + CloudWebsocketTransportAdapter* cloud_ta = + static_cast<CloudWebsocketTransportAdapter*>(controller_); + cloud_properties = cloud_ta->GetAppCloudTransportConfig(device_uid_); + auto const host = cloud_device->GetHost(); + auto const port = cloud_device->GetPort(); + boost::system::error_code ec; + + LOG4CXX_DEBUG(logger_, "Cloud app endpoint: " << cloud_properties.endpoint); + LOG4CXX_DEBUG(logger_, + "Cloud app certificate: " << cloud_properties.certificate); + LOG4CXX_DEBUG( + logger_, + "Cloud app authentication token: " << cloud_properties.auth_token); + LOG4CXX_DEBUG( + logger_, + "Cloud app transport type: " << cloud_properties.cloud_transport_type); + LOG4CXX_DEBUG(logger_, + "Cloud app hybrid app preference: " + << cloud_properties.hybrid_app_preference); + + auto const results = resolver_.resolve(host, port, ec); + if (ec) { + std::string str_err = "ErrorMessage: " + ec.message(); + LOG4CXX_ERROR(logger_, "Could not resolve host/port: " << str_err); + return TransportAdapter::FAIL; + } + + // Make Connection to host IP Address over TCP + if (cloud_properties.cloud_transport_type == "WS") { + boost::asio::connect(ws_.next_layer(), results.begin(), results.end(), ec); + } +#ifdef ENABLE_SECURITY + else if (cloud_properties.cloud_transport_type == "WSS") { + boost::asio::connect( + wss_.next_layer().next_layer(), results.begin(), results.end(), ec); + } +#endif // ENABLE_SECURITY + + if (ec) { + std::string str_err = "ErrorMessage: " + ec.message(); + LOG4CXX_ERROR(logger_, + "Could not connect to websocket: " << host << ":" << port); + LOG4CXX_ERROR(logger_, str_err); + return TransportAdapter::FAIL; + } + +#ifdef ENABLE_SECURITY + if (cloud_properties.cloud_transport_type == "WSS") { + AddCertificateAuthority(cloud_properties.certificate, ec); + + if (ec) { + std::string str_err = "ErrorMessage: " + ec.message(); + LOG4CXX_ERROR(logger_, + "Failed to add certificate authority: " + << cloud_properties.certificate); + LOG4CXX_ERROR(logger_, str_err); + Shutdown(); + return TransportAdapter::FAIL; + } + + // Perform SSL Handshake + wss_.next_layer().handshake(ssl::stream_base::client, ec); + + if (ec) { + std::string str_err = "ErrorMessage: " + ec.message(); + LOG4CXX_ERROR(logger_, + "Could not complete SSL Handshake failed with host/port: " + << host << ":" << port); + LOG4CXX_ERROR(logger_, str_err); + Shutdown(); + return TransportAdapter::FAIL; + } + } +#endif // ENABLE_SECURITY + + // Perform websocket handshake + if (cloud_properties.cloud_transport_type == "WS") { + ws_.handshake(host, cloud_device->GetTarget(), ec); + } +#ifdef ENABLE_SECURITY + else if (cloud_properties.cloud_transport_type == "WSS") { + wss_.handshake(host, cloud_device->GetTarget(), ec); + } +#endif // ENABLE_SECURITY + if (ec) { + std::string str_err = "ErrorMessage: " + ec.message(); + LOG4CXX_ERROR( + logger_, + "Could not complete handshake with host/port: " << host << ":" << port); + LOG4CXX_ERROR(logger_, str_err); + return TransportAdapter::FAIL; + } + + // Set the binary message write option + if (cloud_properties.cloud_transport_type == "WS") { + ws_.binary(true); + } +#ifdef ENABLE_SECURITY + else if (cloud_properties.cloud_transport_type == "WSS") { + wss_.binary(true); + } +#endif // ENABLE_SECURITY + write_thread_->start(threads::ThreadOptions()); + controller_->ConnectDone(device_uid_, app_handle_); + + // Start async read + if (cloud_properties.cloud_transport_type == "WS") { + ws_.async_read(buffer_, + std::bind(&WebsocketClientConnection::OnRead, + this, + std::placeholders::_1, + std::placeholders::_2)); + } +#ifdef ENABLE_SECURITY + else if (cloud_properties.cloud_transport_type == "WSS") { + wss_.async_read(buffer_, + std::bind(&WebsocketClientConnection::OnRead, + this, + std::placeholders::_1, + std::placeholders::_2)); + } +#endif // ENABLE_SECURITY + + boost::asio::post(io_pool_, [&]() { ioc_.run(); }); + + LOG4CXX_DEBUG( + logger_, + "Successfully started websocket connection @: " << host << ":" << port); + return TransportAdapter::OK; +} + +void WebsocketClientConnection::Recv(boost::system::error_code ec) { + if (shutdown_) { + return; + } + + if (ec) { + std::string str_err = "ErrorMessage: " + ec.message(); + LOG4CXX_ERROR(logger_, str_err); + Shutdown(); + return; + } + if (cloud_properties.cloud_transport_type == "WS") { + ws_.async_read(buffer_, + std::bind(&WebsocketClientConnection::OnRead, + this, + std::placeholders::_1, + std::placeholders::_2)); + } +#ifdef ENABLE_SECURITY + else if (cloud_properties.cloud_transport_type == "WSS") { + wss_.async_read(buffer_, + std::bind(&WebsocketClientConnection::OnRead, + this, + std::placeholders::_1, + std::placeholders::_2)); + } +#endif // ENABLE_SECURITY +} + +void WebsocketClientConnection::OnRead(boost::system::error_code ec, + std::size_t bytes_transferred) { + boost::ignore_unused(bytes_transferred); + if (ec) { + std::string str_err = "ErrorMessage: " + ec.message(); + LOG4CXX_ERROR(logger_, str_err); + ws_.lowest_layer().close(); + ioc_.stop(); + Shutdown(); + return; + } + std::string data_str = boost::beast::buffers_to_string(buffer_.data()); + + ssize_t size = (ssize_t)buffer_.size(); + const uint8_t* data = boost::asio::buffer_cast<const uint8_t*>( + boost::beast::buffers_front(buffer_.data())); + + ::protocol_handler::RawMessagePtr frame( + new protocol_handler::RawMessage(0, 0, data, size)); + + controller_->DataReceiveDone(device_uid_, app_handle_, frame); + + buffer_.consume(buffer_.size()); + Recv(ec); +} + +TransportAdapter::Error WebsocketClientConnection::SendData( + ::protocol_handler::RawMessagePtr message) { + LOG4CXX_AUTO_TRACE(logger_); + sync_primitives::AutoLock auto_lock(frames_to_send_mutex_); + message_queue_.push(message); + return TransportAdapter::OK; +} + +TransportAdapter::Error WebsocketClientConnection::Disconnect() { + LOG4CXX_AUTO_TRACE(logger_); + Shutdown(); + return TransportAdapter::OK; +} + +void WebsocketClientConnection::Shutdown() { + shutdown_ = true; + + if (thread_delegate_) { + thread_delegate_->SetShutdown(); + write_thread_->join(); + delete thread_delegate_; + thread_delegate_ = NULL; + threads::DeleteThread(write_thread_); + write_thread_ = NULL; + } + if (buffer_.size()) { + buffer_.consume(buffer_.size()); + } + controller_->DisconnectDone(device_uid_, app_handle_); +} + +WebsocketClientConnection::LoopThreadDelegate::LoopThreadDelegate( + MessageQueue<Message, AsyncQueue>* message_queue, + WebsocketClientConnection* handler) + : message_queue_(*message_queue), handler_(*handler), shutdown_(false) {} + +void WebsocketClientConnection::LoopThreadDelegate::threadMain() { + while (!message_queue_.IsShuttingDown() && !shutdown_) { + DrainQueue(); + message_queue_.wait(); + } + DrainQueue(); +} + +void WebsocketClientConnection::LoopThreadDelegate::exitThreadMain() { + shutdown_ = true; + if (!message_queue_.IsShuttingDown()) { + message_queue_.Shutdown(); + } +} + +void WebsocketClientConnection::LoopThreadDelegate::DrainQueue() { + while (!message_queue_.empty()) { + Message message_ptr; + message_queue_.pop(message_ptr); + if (!shutdown_) { + boost::system::error_code ec; + if (handler_.cloud_properties.cloud_transport_type == "WS") { + handler_.ws_.write( + boost::asio::buffer(message_ptr->data(), message_ptr->data_size())); + } +#ifdef ENABLE_SECURITY + else if (handler_.cloud_properties.cloud_transport_type == "WSS") { + handler_.wss_.write( + boost::asio::buffer(message_ptr->data(), message_ptr->data_size())); + } +#endif // ENABLE_SECURITY + if (ec) { + LOG4CXX_ERROR(logger_, "Error writing to websocket"); + handler_.controller_->DataSendFailed(handler_.device_uid_, + handler_.app_handle_, + message_ptr, + DataSendError()); + } + } + } +} + +void WebsocketClientConnection::LoopThreadDelegate::SetShutdown() { + shutdown_ = true; + if (!message_queue_.IsShuttingDown()) { + message_queue_.Shutdown(); + } +} + +} // namespace transport_adapter +} // namespace transport_manager |