From cbc9d3928fb3c4a8db6608e84bad67d2398a52e7 Mon Sep 17 00:00:00 2001 From: Shobhit Adlakha Date: Thu, 31 Mar 2022 07:57:23 -0400 Subject: Fix/WSS re-connection loop (#3891) * Remove shutdown call from failed ssl handshake and add call to teardown failed connection * Reset websocket stream on failed ssl handshake * Add function to reset websocket stream in case of failed handshakes --- .../cloud/websocket_client_connection.h | 7 +- .../src/cloud/websocket_client_connection.cc | 103 +++++++++++++-------- 2 files changed, 71 insertions(+), 39 deletions(-) (limited to 'src') diff --git a/src/components/transport_manager/include/transport_manager/cloud/websocket_client_connection.h b/src/components/transport_manager/include/transport_manager/cloud/websocket_client_connection.h index 0465effa94..afc040f749 100644 --- a/src/components/transport_manager/include/transport_manager/cloud/websocket_client_connection.h +++ b/src/components/transport_manager/include/transport_manager/cloud/websocket_client_connection.h @@ -144,16 +144,19 @@ class WebsocketClientConnection void OnRead(boost::system::error_code ec, std::size_t bytes_transferred); private: + void ResetWebsocketStream(std::string cloud_transport_type); + TransportAdapterController* controller_; boost::asio::io_context ioc_; tcp::resolver resolver_; boost::beast::flat_buffer buffer_; std::string host_; std::string text_; - WS ws_; + std::shared_ptr ws_; #ifdef ENABLE_SECURITY + const char* wss_ciphers_; ssl::context ctx_; - WSS wss_; + std::shared_ptr wss_; #endif // ENABLE_SECURITY std::atomic_bool shutdown_; diff --git a/src/components/transport_manager/src/cloud/websocket_client_connection.cc b/src/components/transport_manager/src/cloud/websocket_client_connection.cc index 6a28c6a7ff..c1274699f7 100644 --- a/src/components/transport_manager/src/cloud/websocket_client_connection.cc +++ b/src/components/transport_manager/src/cloud/websocket_client_connection.cc @@ -48,24 +48,25 @@ WebsocketClientConnection::WebsocketClientConnection( TransportAdapterController* controller) : controller_(controller) , resolver_(ioc_) - , ws_(ioc_) + , ws_(std::make_shared(ioc_)) #ifdef ENABLE_SECURITY , ctx_(ssl::context::tlsv12_client) - , wss_(ioc_, ctx_) + , wss_(std::make_shared(ioc_, ctx_)) #endif // ENABLE_SECURITY , shutdown_(false) , thread_delegate_(new LoopThreadDelegate(&message_queue_, this)) - , write_thread_(threads::CreateThread("WS Async Send", thread_delegate_)) + , write_thread_( + threads::CreateThread("Client WS Async Send", thread_delegate_)) , device_uid_(device_uid) , app_handle_(app_handle) , io_pool_(1) { #ifdef ENABLE_SECURITY - const char* wss_ciphers = + wss_ciphers_ = "ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384:ECDHE-ECDSA-" "CHACHA20-POLY1305:ECDHE-RSA-CHACHA20-POLY1305:ECDHE-ECDSA-AES128-GCM-" "SHA256:ECDHE-RSA-AES128-GCM-SHA256:ECDHE-ECDSA-AES256-SHA384:ECDHE-RSA-" "AES256-SHA384:ECDHE-ECDSA-AES128-SHA256:ECDHE-RSA-AES128-SHA256"; - SSL_CTX_set_cipher_list(ctx_.native_handle(), wss_ciphers); + SSL_CTX_set_cipher_list(ctx_.native_handle(), wss_ciphers_); #endif } @@ -83,7 +84,7 @@ void WebsocketClientConnection::AddCertificateAuthority( return; } - wss_.next_layer().set_verify_mode(ssl::verify_peer); + wss_->next_layer().set_verify_mode(ssl::verify_peer); } #endif // ENABLE_SECURITY @@ -116,12 +117,12 @@ TransportAdapter::Error WebsocketClientConnection::Start() { // Make Connection to host IP Address over TCP if (cloud_properties.cloud_transport_type == "WS") { - boost::asio::connect(ws_.next_layer(), results.begin(), results.end(), ec); + boost::asio::connect(ws_->next_layer(), results.begin(), results.end(), ec); } #ifdef ENABLE_SECURITY else if (cloud_properties.cloud_transport_type == "WSS") { boost::asio::connect( - wss_.next_layer().next_layer(), results.begin(), results.end(), ec); + wss_->next_layer().next_layer(), results.begin(), results.end(), ec); } #endif // ENABLE_SECURITY @@ -141,19 +142,19 @@ TransportAdapter::Error WebsocketClientConnection::Start() { SDL_LOG_ERROR("Failed to add certificate authority: " << cloud_properties.certificate); SDL_LOG_ERROR(str_err); - Shutdown(); + ResetWebsocketStream(cloud_properties.cloud_transport_type); return TransportAdapter::FAIL; } // Perform SSL Handshake - wss_.next_layer().handshake(ssl::stream_base::client, ec); + wss_->next_layer().handshake(ssl::stream_base::client, ec); if (ec) { std::string str_err = "ErrorMessage: " + ec.message(); SDL_LOG_ERROR("Could not complete SSL Handshake failed with host/port: " << host << ":" << port); SDL_LOG_ERROR(str_err); - Shutdown(); + ResetWebsocketStream(cloud_properties.cloud_transport_type); return TransportAdapter::FAIL; } } @@ -161,11 +162,11 @@ TransportAdapter::Error WebsocketClientConnection::Start() { // Perform websocket handshake if (cloud_properties.cloud_transport_type == "WS") { - ws_.handshake(host, cloud_device->GetTarget(), ec); + ws_->handshake(host, cloud_device->GetTarget(), ec); } #ifdef ENABLE_SECURITY else if (cloud_properties.cloud_transport_type == "WSS") { - wss_.handshake(host, cloud_device->GetTarget(), ec); + wss_->handshake(host, cloud_device->GetTarget(), ec); } #endif // ENABLE_SECURITY if (ec) { @@ -173,16 +174,17 @@ TransportAdapter::Error WebsocketClientConnection::Start() { SDL_LOG_ERROR("Could not complete handshake with host/port: " << host << ":" << port); SDL_LOG_ERROR(str_err); + ResetWebsocketStream(cloud_properties.cloud_transport_type); return TransportAdapter::FAIL; } // Set the binary message write option if (cloud_properties.cloud_transport_type == "WS") { - ws_.binary(true); + ws_->binary(true); } #ifdef ENABLE_SECURITY else if (cloud_properties.cloud_transport_type == "WSS") { - wss_.binary(true); + wss_->binary(true); } #endif // ENABLE_SECURITY write_thread_->Start(threads::ThreadOptions()); @@ -190,20 +192,20 @@ TransportAdapter::Error WebsocketClientConnection::Start() { // Start async read if (cloud_properties.cloud_transport_type == "WS") { - ws_.async_read(buffer_, - std::bind(&WebsocketClientConnection::OnRead, - this, - std::placeholders::_1, - std::placeholders::_2)); - } -#ifdef ENABLE_SECURITY - else if (cloud_properties.cloud_transport_type == "WSS") { - wss_.async_read(buffer_, + ws_->async_read(buffer_, std::bind(&WebsocketClientConnection::OnRead, this, std::placeholders::_1, std::placeholders::_2)); } +#ifdef ENABLE_SECURITY + else if (cloud_properties.cloud_transport_type == "WSS") { + wss_->async_read(buffer_, + std::bind(&WebsocketClientConnection::OnRead, + this, + std::placeholders::_1, + std::placeholders::_2)); + } #endif // ENABLE_SECURITY boost::asio::post(io_pool_, [&]() { ioc_.run(); }); @@ -225,20 +227,20 @@ void WebsocketClientConnection::Recv(boost::system::error_code ec) { return; } if (cloud_properties.cloud_transport_type == "WS") { - ws_.async_read(buffer_, - std::bind(&WebsocketClientConnection::OnRead, - this, - std::placeholders::_1, - std::placeholders::_2)); - } -#ifdef ENABLE_SECURITY - else if (cloud_properties.cloud_transport_type == "WSS") { - wss_.async_read(buffer_, + ws_->async_read(buffer_, std::bind(&WebsocketClientConnection::OnRead, this, std::placeholders::_1, std::placeholders::_2)); } +#ifdef ENABLE_SECURITY + else if (cloud_properties.cloud_transport_type == "WSS") { + wss_->async_read(buffer_, + std::bind(&WebsocketClientConnection::OnRead, + this, + std::placeholders::_1, + std::placeholders::_2)); + } #endif // ENABLE_SECURITY } @@ -248,7 +250,15 @@ void WebsocketClientConnection::OnRead(boost::system::error_code ec, if (ec) { std::string str_err = "ErrorMessage: " + ec.message(); SDL_LOG_ERROR(str_err); - boost::beast::get_lowest_layer(ws_).close(); + if (cloud_properties.cloud_transport_type == "WS") { + boost::beast::get_lowest_layer(*ws_).close(); + } +#ifdef ENABLE_SECURITY + else if (cloud_properties.cloud_transport_type == "WSS") { + boost::beast::get_lowest_layer(*wss_).close(); + } +#endif // ENABLE_SECURITY + ioc_.stop(); Shutdown(); return; @@ -287,7 +297,10 @@ void WebsocketClientConnection::Shutdown() { if (thread_delegate_) { thread_delegate_->SetShutdown(); - write_thread_->Stop(threads::Thread::kThreadSoftStop); + + if (write_thread_->IsRunning()) { + write_thread_->Stop(threads::Thread::kThreadSoftStop); + } delete thread_delegate_; thread_delegate_ = NULL; threads::DeleteThread(write_thread_); @@ -299,6 +312,22 @@ void WebsocketClientConnection::Shutdown() { controller_->DisconnectDone(device_uid_, app_handle_); } +void WebsocketClientConnection::ResetWebsocketStream( + std::string cloud_transport_type) { + if (cloud_transport_type == "WS") { + ws_.reset(); + ws_ = std::make_shared(ioc_); + } +#ifdef ENABLE_SECURITY + else if (cloud_transport_type == "WSS") { + ctx_ = ssl::context(ssl::context::tlsv12_client); + SSL_CTX_set_cipher_list(ctx_.native_handle(), wss_ciphers_); + wss_.reset(); + wss_ = std::make_shared(ioc_, ctx_); + } +#endif // ENABLE_SECURITY +} + WebsocketClientConnection::LoopThreadDelegate::LoopThreadDelegate( MessageQueue* message_queue, WebsocketClientConnection* handler) @@ -330,12 +359,12 @@ void WebsocketClientConnection::LoopThreadDelegate::DrainQueue() { if (!shutdown_) { boost::system::error_code ec; if (handler_.cloud_properties.cloud_transport_type == "WS") { - handler_.ws_.write( + handler_.ws_->write( boost::asio::buffer(message_ptr->data(), message_ptr->data_size())); } #ifdef ENABLE_SECURITY else if (handler_.cloud_properties.cloud_transport_type == "WSS") { - handler_.wss_.write( + handler_.wss_->write( boost::asio::buffer(message_ptr->data(), message_ptr->data_size())); } #endif // ENABLE_SECURITY -- cgit v1.2.1