diff options
Diffstat (limited to 'src/components/transport_manager/src')
38 files changed, 4064 insertions, 524 deletions
diff --git a/src/components/transport_manager/src/bluetooth/bluetooth_connection_factory.cc b/src/components/transport_manager/src/bluetooth/bluetooth_connection_factory.cc index 0d8a77c88f..19f4078443 100644 --- a/src/components/transport_manager/src/bluetooth/bluetooth_connection_factory.cc +++ b/src/components/transport_manager/src/bluetooth/bluetooth_connection_factory.cc @@ -33,11 +33,10 @@ * POSSIBILITY OF SUCH DAMAGE. */ -#include "transport_manager/transport_adapter/transport_adapter_controller.h" #include "transport_manager/bluetooth/bluetooth_connection_factory.h" #include "transport_manager/bluetooth/bluetooth_socket_connection.h" +#include "transport_manager/transport_adapter/transport_adapter_controller.h" #include "utils/logger.h" -#include "utils/make_shared.h" namespace transport_manager { namespace transport_adapter { @@ -55,8 +54,8 @@ TransportAdapter::Error BluetoothConnectionFactory::Init() { TransportAdapter::Error BluetoothConnectionFactory::CreateConnection( const DeviceUID& device_uid, const ApplicationHandle& app_handle) { LOG4CXX_AUTO_TRACE(logger_); - utils::SharedPtr<BluetoothSocketConnection> connection = - utils::MakeShared<BluetoothSocketConnection>( + std::shared_ptr<BluetoothSocketConnection> connection = + std::make_shared<BluetoothSocketConnection>( device_uid, app_handle, controller_); controller_->ConnectionCreated(connection, device_uid, app_handle); TransportAdapter::Error error = connection->Start(); diff --git a/src/components/transport_manager/src/bluetooth/bluetooth_device.cc b/src/components/transport_manager/src/bluetooth/bluetooth_device.cc index 038515170d..734d8fe1e9 100644 --- a/src/components/transport_manager/src/bluetooth/bluetooth_device.cc +++ b/src/components/transport_manager/src/bluetooth/bluetooth_device.cc @@ -36,15 +36,16 @@ #include <bluetooth/bluetooth.h> #include <bluetooth/hci.h> #include <bluetooth/hci_lib.h> +#include <bluetooth/rfcomm.h> #include <bluetooth/sdp.h> #include <bluetooth/sdp_lib.h> -#include <bluetooth/rfcomm.h> #include <errno.h> -#include <sys/types.h> #include <sys/socket.h> +#include <sys/types.h> #include <algorithm> +#include <iostream> #include <limits> #include "utils/logger.h" @@ -56,7 +57,8 @@ bool BluetoothDevice::GetRfcommChannel(const ApplicationHandle app_handle, uint8_t* channel_out) { LOG4CXX_TRACE(logger_, "enter. app_handle: " << app_handle - << ", channel_out: " << channel_out); + << ", channel_out: " << std::hex + << reinterpret_cast<void*>(channel_out)); if (app_handle < 0 || app_handle > std::numeric_limits<uint8_t>::max()) { LOG4CXX_TRACE(logger_, "exit with FALSE. Condition: app_handle < 0 || app_handle > " diff --git a/src/components/transport_manager/src/bluetooth/bluetooth_device_scanner.cc b/src/components/transport_manager/src/bluetooth/bluetooth_device_scanner.cc index 743c95f2e6..4759b2003a 100644 --- a/src/components/transport_manager/src/bluetooth/bluetooth_device_scanner.cc +++ b/src/components/transport_manager/src/bluetooth/bluetooth_device_scanner.cc @@ -38,18 +38,18 @@ #include <bluetooth/bluetooth.h> #include <bluetooth/hci.h> #include <bluetooth/hci_lib.h> +#include <bluetooth/rfcomm.h> #include <bluetooth/sdp.h> #include <bluetooth/sdp_lib.h> -#include <bluetooth/rfcomm.h> #include <errno.h> -#include <sys/types.h> #include <sys/socket.h> #include <sys/time.h> +#include <sys/types.h> #include <unistd.h> -#include <vector> #include <sstream> -#include "transport_manager/bluetooth/bluetooth_transport_adapter.h" +#include <vector> #include "transport_manager/bluetooth/bluetooth_device.h" +#include "transport_manager/bluetooth/bluetooth_transport_adapter.h" #include "utils/logger.h" #include "utils/threads/thread.h" @@ -133,7 +133,27 @@ BluetoothDeviceScanner::BluetoothDeviceScanner( 0xA8}; sdp_uuid128_create(&smart_device_link_service_uuid_, smart_device_link_service_uuid_data); - thread_ = threads::CreateThread("BT Device Scaner", + thread_ = threads::CreateThread("BT Device Scanner", + new BluetoothDeviceScannerDelegate(this)); +} + +BluetoothDeviceScanner::BluetoothDeviceScanner( + TransportAdapterController* controller, + bool auto_repeat_search, + int auto_repeat_pause_sec, + const uint8_t* smart_device_link_service_uuid_data) + : controller_(controller) + , thread_(NULL) + , shutdown_requested_(false) + , ready_(true) + , device_scan_requested_(false) + , device_scan_requested_lock_() + , device_scan_requested_cv_() + , auto_repeat_search_(auto_repeat_search) + , auto_repeat_pause_sec_(auto_repeat_pause_sec) { + sdp_uuid128_create(&smart_device_link_service_uuid_, + smart_device_link_service_uuid_data); + thread_ = threads::CreateThread("BT Device Scanner", new BluetoothDeviceScannerDelegate(this)); } @@ -262,8 +282,8 @@ void BluetoothDeviceScanner::CheckSDLServiceOnDevices( deviceName[name_len - 1] = '\0'; } - Device* bluetooth_device = - new BluetoothDevice(bd_address, deviceName, sdl_rfcomm_channels[i]); + auto bluetooth_device = std::make_shared<BluetoothDevice>( + bd_address, deviceName, sdl_rfcomm_channels[i]); if (bluetooth_device) { LOG4CXX_INFO(logger_, "Bluetooth device created successfully"); discovered_devices->push_back(bluetooth_device); diff --git a/src/components/transport_manager/src/bluetooth/bluetooth_socket_connection.cc b/src/components/transport_manager/src/bluetooth/bluetooth_socket_connection.cc index b4370e4b12..78597ac2ad 100644 --- a/src/components/transport_manager/src/bluetooth/bluetooth_socket_connection.cc +++ b/src/components/transport_manager/src/bluetooth/bluetooth_socket_connection.cc @@ -33,13 +33,13 @@ #include "transport_manager/bluetooth/bluetooth_socket_connection.h" -#include <unistd.h> #include <bluetooth/bluetooth.h> #include <bluetooth/hci.h> #include <bluetooth/hci_lib.h> +#include <bluetooth/rfcomm.h> #include <bluetooth/sdp.h> #include <bluetooth/sdp_lib.h> -#include <bluetooth/rfcomm.h> +#include <unistd.h> #include "transport_manager/bluetooth/bluetooth_device.h" #include "transport_manager/transport_adapter/transport_adapter_controller.h" @@ -93,9 +93,9 @@ bool BluetoothSocketConnection::Establish(ConnectError** error) { do { rfcomm_socket = socket(AF_BLUETOOTH, SOCK_STREAM, BTPROTO_RFCOMM); if (-1 == rfcomm_socket) { - LOG4CXX_ERROR_WITH_ERRNO(logger_, - "Failed to create RFCOMM socket for device " - << device_handle()); + LOG4CXX_ERROR_WITH_ERRNO( + logger_, + "Failed to create RFCOMM socket for device " << device_handle()); *error = new ConnectError(); LOG4CXX_TRACE(logger_, "exit with FALSE"); return false; diff --git a/src/components/transport_manager/src/bluetooth/bluetooth_transport_adapter.cc b/src/components/transport_manager/src/bluetooth/bluetooth_transport_adapter.cc index 0f83f32c60..066751c474 100644 --- a/src/components/transport_manager/src/bluetooth/bluetooth_transport_adapter.cc +++ b/src/components/transport_manager/src/bluetooth/bluetooth_transport_adapter.cc @@ -34,18 +34,18 @@ */ #include <errno.h> -#include <sys/types.h> #include <sys/socket.h> +#include <sys/types.h> #include <unistd.h> +#include <bluetooth/bluetooth.h> #include <iomanip> #include <set> -#include <bluetooth/bluetooth.h> -#include "transport_manager/bluetooth/bluetooth_transport_adapter.h" -#include "transport_manager/bluetooth/bluetooth_device_scanner.h" #include "transport_manager/bluetooth/bluetooth_connection_factory.h" #include "transport_manager/bluetooth/bluetooth_device.h" +#include "transport_manager/bluetooth/bluetooth_device_scanner.h" +#include "transport_manager/bluetooth/bluetooth_transport_adapter.h" #include "utils/logger.h" @@ -57,12 +57,14 @@ CREATE_LOGGERPTR_GLOBAL(logger_, "TransportManager") BluetoothTransportAdapter::~BluetoothTransportAdapter() {} BluetoothTransportAdapter::BluetoothTransportAdapter( - resumption::LastState& last_state, const TransportManagerSettings& settings) - : TransportAdapterImpl(new BluetoothDeviceScanner(this, true, 0), - new BluetoothConnectionFactory(this), - NULL, - last_state, - settings) {} + resumption::LastStateWrapperPtr last_state_wrapper, + const TransportManagerSettings& settings) + : TransportAdapterImpl( + new BluetoothDeviceScanner(this, true, 0, settings.bluetooth_uuid()), + new BluetoothConnectionFactory(this), + NULL, + last_state_wrapper, + settings) {} DeviceType BluetoothTransportAdapter::GetDeviceType() const { return BLUETOOTH; @@ -80,8 +82,8 @@ void BluetoothTransportAdapter::Store() const { if (!device) { // device could have been disconnected continue; } - utils::SharedPtr<BluetoothDevice> bluetooth_device = - DeviceSptr::static_pointer_cast<BluetoothDevice>(device); + std::shared_ptr<BluetoothDevice> bluetooth_device = + std::static_pointer_cast<BluetoothDevice>(device); Json::Value device_dictionary; device_dictionary["name"] = bluetooth_device->name(); char address[18]; @@ -111,23 +113,25 @@ void BluetoothTransportAdapter::Store() const { } } bluetooth_adapter_dictionary["devices"] = devices_dictionary; - Json::Value& dictionary = last_state().get_dictionary(); + resumption::LastStateAccessor accessor = last_state_wrapper_->get_accessor(); + Json::Value dictionary = accessor.GetData().dictionary(); dictionary["TransportManager"]["BluetoothAdapter"] = bluetooth_adapter_dictionary; + accessor.GetMutableData().set_dictionary(dictionary); LOG4CXX_TRACE(logger_, "exit"); } bool BluetoothTransportAdapter::Restore() { LOG4CXX_TRACE(logger_, "enter"); bool errors_occured = false; + resumption::LastStateAccessor accessor = last_state_wrapper_->get_accessor(); + Json::Value dictionary = accessor.GetData().dictionary(); const Json::Value bluetooth_adapter_dictionary = - last_state().get_dictionary()["TransportManager"]["BluetoothAdapter"]; + dictionary["TransportManager"]["BluetoothAdapter"]; const Json::Value devices_dictionary = bluetooth_adapter_dictionary["devices"]; - for (Json::Value::const_iterator i = devices_dictionary.begin(); - i != devices_dictionary.end(); - ++i) { - const Json::Value device_dictionary = *i; + for (const auto& bt_device : devices_dictionary) { + const Json::Value device_dictionary = bt_device; std::string name = device_dictionary["name"].asString(); std::string address_record = device_dictionary["address"].asString(); bdaddr_t address; @@ -135,10 +139,8 @@ bool BluetoothTransportAdapter::Restore() { RfcommChannelVector rfcomm_channels; const Json::Value applications_dictionary = device_dictionary["applications"]; - for (Json::Value::const_iterator j = applications_dictionary.begin(); - j != applications_dictionary.end(); - ++j) { - const Json::Value application_dictionary = *j; + for (const auto& application : applications_dictionary) { + const Json::Value application_dictionary = application; std::string rfcomm_channel_record = application_dictionary["rfcomm_channel"].asString(); uint8_t rfcomm_channel = @@ -149,11 +151,9 @@ bool BluetoothTransportAdapter::Restore() { new BluetoothDevice(address, name.c_str(), rfcomm_channels); DeviceSptr device(bluetooth_device); AddDevice(device); - for (RfcommChannelVector::const_iterator j = rfcomm_channels.begin(); - j != rfcomm_channels.end(); - ++j) { + for (const auto& channel : rfcomm_channels) { ApplicationHandle app_handle = - *j; // for Bluetooth device app_handle is just RFCOMM channel + channel; // for Bluetooth device app_handle is just RFCOMM channel if (Error::OK != Connect(device->unique_device_id(), app_handle)) { errors_occured = true; } diff --git a/src/components/transport_manager/src/cloud/cloud_device.cc b/src/components/transport_manager/src/cloud/cloud_device.cc new file mode 100644 index 0000000000..9225589d57 --- /dev/null +++ b/src/components/transport_manager/src/cloud/cloud_device.cc @@ -0,0 +1,97 @@ +/* + * + * Copyright (c) 2018, Livio + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following + * disclaimer in the documentation and/or other materials provided with the + * distribution. + * + * Neither the name of the Ford Motor Company nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "transport_manager/cloud/cloud_device.h" + +#include "utils/logger.h" + +namespace transport_manager { +namespace transport_adapter { +CREATE_LOGGERPTR_GLOBAL(logger_, "TransportManager") + +CloudDevice::CloudDevice(std::string& host, + std::string& port, + std::string& name) + : Device(name, std::string(name)) + , endpoint_(CloudAppEndpoint{.host = host, + .port = port, + .path = "/", + .query = "", + .fragment = ""}) {} + +CloudDevice::CloudDevice(CloudAppEndpoint& endpoint, std::string& name) + : Device(name, std::string(name)), endpoint_(endpoint) {} + +bool CloudDevice::IsSameAs(const Device* other) const { + LOG4CXX_TRACE(logger_, "enter. device: " << other); + + const CloudDevice* other_cloud_device = + dynamic_cast<const CloudDevice*>(other); + + if (!other_cloud_device) { + return false; + } + + if (GetHost() != other_cloud_device->GetHost()) { + return false; + } + + if (GetPort() != other_cloud_device->GetPort()) { + return false; + } + + if (GetTarget() != other_cloud_device->GetTarget()) { + return false; + } + + return true; +} + +ApplicationList CloudDevice::GetApplicationList() const { + return ApplicationList{0}; +} + +const std::string& CloudDevice::GetHost() const { + return endpoint_.host; +} + +const std::string& CloudDevice::GetPort() const { + return endpoint_.port; +} + +const std::string CloudDevice::GetTarget() const { + return endpoint_.path + endpoint_.query + endpoint_.fragment; +} + +} // namespace transport_adapter +} // namespace transport_manager diff --git a/src/components/transport_manager/src/cloud/cloud_websocket_connection_factory.cc b/src/components/transport_manager/src/cloud/cloud_websocket_connection_factory.cc new file mode 100644 index 0000000000..d072685eef --- /dev/null +++ b/src/components/transport_manager/src/cloud/cloud_websocket_connection_factory.cc @@ -0,0 +1,85 @@ +/* + * \file cloud_websocket_connection_factory.cc + * \brief CloudWebsocketConnectionFactory class source file. + * + * Copyright (c) 2018, Livio + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following + * disclaimer in the documentation and/or other materials provided with the + * distribution. + * + * Neither the name of the Ford Motor Company nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "transport_manager/cloud/cloud_websocket_connection_factory.h" +#include "transport_manager/cloud/websocket_client_connection.h" +#include "transport_manager/transport_adapter/transport_adapter_controller.h" +#include "utils/logger.h" + +#include "transport_manager/cloud/cloud_device.h" + +namespace transport_manager { +namespace transport_adapter { + +CREATE_LOGGERPTR_GLOBAL(logger_, "TransportManager") + +CloudWebsocketConnectionFactory::CloudWebsocketConnectionFactory( + TransportAdapterController* controller) + : controller_(controller) {} + +TransportAdapter::Error CloudWebsocketConnectionFactory::Init() { + return TransportAdapter::OK; +} + +TransportAdapter::Error CloudWebsocketConnectionFactory::CreateConnection( + const DeviceUID& device_uid, const ApplicationHandle& app_handle) { + LOG4CXX_AUTO_TRACE(logger_); + auto connection = controller_->FindPendingConnection(device_uid, app_handle); + + std::shared_ptr<WebsocketClientConnection> ws_connection = + std::dynamic_pointer_cast<WebsocketClientConnection>(connection); + if (ws_connection.use_count() == 0) { + return TransportAdapter::Error::BAD_PARAM; + } + + TransportAdapter::Error error = ws_connection->Start(); + if (TransportAdapter::OK != error) { + LOG4CXX_ERROR( + logger_, + "Cloud Websocket connection::Start() failed with error: " << error); + } + return error; +} + +void CloudWebsocketConnectionFactory::Terminate() {} + +bool CloudWebsocketConnectionFactory::IsInitialised() const { + return true; +} + +CloudWebsocketConnectionFactory::~CloudWebsocketConnectionFactory() {} + +} // namespace transport_adapter +} // namespace transport_manager diff --git a/src/components/transport_manager/src/cloud/cloud_websocket_transport_adapter.cc b/src/components/transport_manager/src/cloud/cloud_websocket_transport_adapter.cc new file mode 100644 index 0000000000..2d1de703b1 --- /dev/null +++ b/src/components/transport_manager/src/cloud/cloud_websocket_transport_adapter.cc @@ -0,0 +1,145 @@ +/* + * Copyright (c) 2018, Livio + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following + * disclaimer in the documentation and/or other materials provided with the + * distribution. + * + * Neither the name of the Ford Motor Company nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "transport_manager/cloud/cloud_websocket_transport_adapter.h" +#include "transport_manager/cloud/cloud_websocket_connection_factory.h" + +#include "transport_manager/cloud/cloud_device.h" +#include "transport_manager/cloud/websocket_client_connection.h" + +#include <boost/regex.hpp> + +namespace transport_manager { +namespace transport_adapter { + +CREATE_LOGGERPTR_GLOBAL(logger_, "TransportManager") + +CloudWebsocketTransportAdapter::CloudWebsocketTransportAdapter( + resumption::LastStateWrapperPtr last_state_wrapper, + const TransportManagerSettings& settings) + : TransportAdapterImpl(NULL, + new CloudWebsocketConnectionFactory(this), + NULL, + last_state_wrapper, + settings) {} + +CloudWebsocketTransportAdapter::~CloudWebsocketTransportAdapter() {} + +void CloudWebsocketTransportAdapter::SetAppCloudTransportConfig( + std::string app_id, CloudAppProperties properties) { + transport_config_[app_id] = properties; +} + +const CloudAppProperties& +CloudWebsocketTransportAdapter::GetAppCloudTransportConfig(std::string app_id) { + return transport_config_[app_id]; +} + +DeviceType CloudWebsocketTransportAdapter::GetDeviceType() const { + return CLOUD_WEBSOCKET; +} + +void CloudWebsocketTransportAdapter::Store() const {} + +bool CloudWebsocketTransportAdapter::Restore() { + return true; +} + +void CloudWebsocketTransportAdapter::CreateDevice(const std::string& uid) { + // If the device has already been created, just ignore the request + DeviceSptr device = FindDevice(uid); + if (device.use_count() != 0) { + return; + } + + std::string protocol_pattern = "(wss?)"; + std::string host_pattern = + "(([^?#%\\\\/@:\\s]{1,})\\:?([^?#%\\\\/@\\s]*)\\@?([^?#%\\\\/\\s]*))"; + std::string port_pattern = "(\\d{2,5})"; + // Optional parameters + std::string path_pattern = "((\\/[^\\/#?\\s]+)*)?\\/?"; + std::string query_pattern = "(\\?[^=&#\\s]*=?[^#\\s]*&?)?"; + std::string fragment_pattern = "(#[^\\s]*)?"; + + // Extract host and port from endpoint string + boost::regex group_pattern(protocol_pattern + ":\\/\\/" + host_pattern + ":" + + port_pattern + path_pattern + query_pattern + + fragment_pattern, + boost::regex::icase); + boost::smatch results; + std::string str = uid; + + if (!boost::regex_search(str, results, group_pattern)) { + LOG4CXX_DEBUG(logger_, "Invalid Pattern: " << uid); + return; + } + + LOG4CXX_DEBUG(logger_, "#Results: " << results.size()); + std::string results_str; + for (size_t i = 0; i < results.size(); i++) { + results_str += " R[" + std::to_string(i) + "]:"; + results_str += + (results[i].length() != 0) ? results[i] : std::string("<EMPTY>"); + } + LOG4CXX_DEBUG(logger_, "Results: " << results_str); + + std::string device_id = uid; + + CloudAppEndpoint endpoint{.host = results[2], + .port = results[6], + .path = results[7] + "/", + .query = results[9], + .fragment = results[10]}; + + LOG4CXX_DEBUG(logger_, + "Creating Cloud Device For Host: " + << endpoint.host << " at Port: " << endpoint.port + << " with Target: " + << (endpoint.path + endpoint.query + endpoint.fragment)); + + auto cloud_device = std::make_shared<CloudDevice>(endpoint, device_id); + + DeviceVector devices{cloud_device}; + + SearchDeviceDone(devices); + + // Create connection object, do not start until app is activated + std::shared_ptr<WebsocketClientConnection> connection = + std::make_shared<WebsocketClientConnection>(uid, 0, this); + + ConnectionCreated(connection, uid, 0); + ConnectPending(uid, 0); + + return; +} +} // namespace transport_adapter +} // namespace transport_manager diff --git a/src/components/transport_manager/src/cloud/websocket_client_connection.cc b/src/components/transport_manager/src/cloud/websocket_client_connection.cc new file mode 100644 index 0000000000..794cf57208 --- /dev/null +++ b/src/components/transport_manager/src/cloud/websocket_client_connection.cc @@ -0,0 +1,358 @@ +/* + * + * Copyright (c) 2018, Livio + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following + * disclaimer in the documentation and/or other materials provided with the + * distribution. + * + * Neither the name of the Ford Motor Company nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "transport_manager/cloud/websocket_client_connection.h" +#include "transport_manager/cloud/cloud_device.h" + +#include "transport_manager/transport_adapter/transport_adapter_controller.h" + +#include "utils/logger.h" + +namespace transport_manager { +namespace transport_adapter { +CREATE_LOGGERPTR_GLOBAL(logger_, "TransportManager") + +WebsocketClientConnection::WebsocketClientConnection( + const DeviceUID& device_uid, + const ApplicationHandle& app_handle, + TransportAdapterController* controller) + : controller_(controller) + , resolver_(ioc_) + , ws_(ioc_) +#ifdef ENABLE_SECURITY + , ctx_(ssl::context::sslv23_client) + , wss_(ioc_, ctx_) +#endif // ENABLE_SECURITY + , shutdown_(false) + , thread_delegate_(new LoopThreadDelegate(&message_queue_, this)) + , write_thread_(threads::CreateThread("WS Async Send", thread_delegate_)) + , device_uid_(device_uid) + , app_handle_(app_handle) + , io_pool_(1) { +} + +WebsocketClientConnection::~WebsocketClientConnection() { + ioc_.stop(); + io_pool_.join(); +} + +#ifdef ENABLE_SECURITY +void WebsocketClientConnection::AddCertificateAuthority( + const std::string cert, boost::system::error_code& ec) { + ctx_.add_certificate_authority(boost::asio::buffer(cert.data(), cert.size()), + ec); + if (ec) { + return; + } + + wss_.next_layer().set_verify_mode(ssl::verify_peer); +} +#endif // ENABLE_SECURITY + +TransportAdapter::Error WebsocketClientConnection::Start() { + LOG4CXX_AUTO_TRACE(logger_); + DeviceSptr device = controller_->FindDevice(device_uid_); + CloudDevice* cloud_device = static_cast<CloudDevice*>(device.get()); + CloudWebsocketTransportAdapter* cloud_ta = + static_cast<CloudWebsocketTransportAdapter*>(controller_); + cloud_properties = cloud_ta->GetAppCloudTransportConfig(device_uid_); + auto const host = cloud_device->GetHost(); + auto const port = cloud_device->GetPort(); + boost::system::error_code ec; + + LOG4CXX_DEBUG(logger_, "Cloud app endpoint: " << cloud_properties.endpoint); + LOG4CXX_DEBUG(logger_, + "Cloud app certificate: " << cloud_properties.certificate); + LOG4CXX_DEBUG( + logger_, + "Cloud app authentication token: " << cloud_properties.auth_token); + LOG4CXX_DEBUG( + logger_, + "Cloud app transport type: " << cloud_properties.cloud_transport_type); + LOG4CXX_DEBUG(logger_, + "Cloud app hybrid app preference: " + << cloud_properties.hybrid_app_preference); + + auto const results = resolver_.resolve(host, port, ec); + if (ec) { + std::string str_err = "ErrorMessage: " + ec.message(); + LOG4CXX_ERROR(logger_, "Could not resolve host/port: " << str_err); + return TransportAdapter::FAIL; + } + + // Make Connection to host IP Address over TCP + if (cloud_properties.cloud_transport_type == "WS") { + boost::asio::connect(ws_.next_layer(), results.begin(), results.end(), ec); + } +#ifdef ENABLE_SECURITY + else if (cloud_properties.cloud_transport_type == "WSS") { + boost::asio::connect( + wss_.next_layer().next_layer(), results.begin(), results.end(), ec); + } +#endif // ENABLE_SECURITY + + if (ec) { + std::string str_err = "ErrorMessage: " + ec.message(); + LOG4CXX_ERROR(logger_, + "Could not connect to websocket: " << host << ":" << port); + LOG4CXX_ERROR(logger_, str_err); + return TransportAdapter::FAIL; + } + +#ifdef ENABLE_SECURITY + if (cloud_properties.cloud_transport_type == "WSS") { + AddCertificateAuthority(cloud_properties.certificate, ec); + + if (ec) { + std::string str_err = "ErrorMessage: " + ec.message(); + LOG4CXX_ERROR(logger_, + "Failed to add certificate authority: " + << cloud_properties.certificate); + LOG4CXX_ERROR(logger_, str_err); + Shutdown(); + return TransportAdapter::FAIL; + } + + // Perform SSL Handshake + wss_.next_layer().handshake(ssl::stream_base::client, ec); + + if (ec) { + std::string str_err = "ErrorMessage: " + ec.message(); + LOG4CXX_ERROR(logger_, + "Could not complete SSL Handshake failed with host/port: " + << host << ":" << port); + LOG4CXX_ERROR(logger_, str_err); + Shutdown(); + return TransportAdapter::FAIL; + } + } +#endif // ENABLE_SECURITY + + // Perform websocket handshake + if (cloud_properties.cloud_transport_type == "WS") { + ws_.handshake(host, cloud_device->GetTarget(), ec); + } +#ifdef ENABLE_SECURITY + else if (cloud_properties.cloud_transport_type == "WSS") { + wss_.handshake(host, cloud_device->GetTarget(), ec); + } +#endif // ENABLE_SECURITY + if (ec) { + std::string str_err = "ErrorMessage: " + ec.message(); + LOG4CXX_ERROR( + logger_, + "Could not complete handshake with host/port: " << host << ":" << port); + LOG4CXX_ERROR(logger_, str_err); + return TransportAdapter::FAIL; + } + + // Set the binary message write option + if (cloud_properties.cloud_transport_type == "WS") { + ws_.binary(true); + } +#ifdef ENABLE_SECURITY + else if (cloud_properties.cloud_transport_type == "WSS") { + wss_.binary(true); + } +#endif // ENABLE_SECURITY + write_thread_->start(threads::ThreadOptions()); + controller_->ConnectDone(device_uid_, app_handle_); + + // Start async read + if (cloud_properties.cloud_transport_type == "WS") { + ws_.async_read(buffer_, + std::bind(&WebsocketClientConnection::OnRead, + this, + std::placeholders::_1, + std::placeholders::_2)); + } +#ifdef ENABLE_SECURITY + else if (cloud_properties.cloud_transport_type == "WSS") { + wss_.async_read(buffer_, + std::bind(&WebsocketClientConnection::OnRead, + this, + std::placeholders::_1, + std::placeholders::_2)); + } +#endif // ENABLE_SECURITY + + boost::asio::post(io_pool_, [&]() { ioc_.run(); }); + + LOG4CXX_DEBUG( + logger_, + "Successfully started websocket connection @: " << host << ":" << port); + return TransportAdapter::OK; +} + +void WebsocketClientConnection::Recv(boost::system::error_code ec) { + if (shutdown_) { + return; + } + + if (ec) { + std::string str_err = "ErrorMessage: " + ec.message(); + LOG4CXX_ERROR(logger_, str_err); + Shutdown(); + return; + } + if (cloud_properties.cloud_transport_type == "WS") { + ws_.async_read(buffer_, + std::bind(&WebsocketClientConnection::OnRead, + this, + std::placeholders::_1, + std::placeholders::_2)); + } +#ifdef ENABLE_SECURITY + else if (cloud_properties.cloud_transport_type == "WSS") { + wss_.async_read(buffer_, + std::bind(&WebsocketClientConnection::OnRead, + this, + std::placeholders::_1, + std::placeholders::_2)); + } +#endif // ENABLE_SECURITY +} + +void WebsocketClientConnection::OnRead(boost::system::error_code ec, + std::size_t bytes_transferred) { + boost::ignore_unused(bytes_transferred); + if (ec) { + std::string str_err = "ErrorMessage: " + ec.message(); + LOG4CXX_ERROR(logger_, str_err); + ws_.lowest_layer().close(); + ioc_.stop(); + Shutdown(); + return; + } + std::string data_str = boost::beast::buffers_to_string(buffer_.data()); + + ssize_t size = (ssize_t)buffer_.size(); + const uint8_t* data = boost::asio::buffer_cast<const uint8_t*>( + boost::beast::buffers_front(buffer_.data())); + + ::protocol_handler::RawMessagePtr frame( + new protocol_handler::RawMessage(0, 0, data, size, false)); + + controller_->DataReceiveDone(device_uid_, app_handle_, frame); + + buffer_.consume(buffer_.size()); + Recv(ec); +} + +TransportAdapter::Error WebsocketClientConnection::SendData( + ::protocol_handler::RawMessagePtr message) { + LOG4CXX_AUTO_TRACE(logger_); + sync_primitives::AutoLock auto_lock(frames_to_send_mutex_); + message_queue_.push(message); + return TransportAdapter::OK; +} + +TransportAdapter::Error WebsocketClientConnection::Disconnect() { + LOG4CXX_AUTO_TRACE(logger_); + Shutdown(); + return TransportAdapter::OK; +} + +void WebsocketClientConnection::Shutdown() { + shutdown_ = true; + + if (thread_delegate_) { + thread_delegate_->SetShutdown(); + write_thread_->join(); + delete thread_delegate_; + thread_delegate_ = NULL; + threads::DeleteThread(write_thread_); + write_thread_ = NULL; + } + if (buffer_.size()) { + buffer_.consume(buffer_.size()); + } + controller_->DisconnectDone(device_uid_, app_handle_); +} + +WebsocketClientConnection::LoopThreadDelegate::LoopThreadDelegate( + MessageQueue<Message, AsyncQueue>* message_queue, + WebsocketClientConnection* handler) + : message_queue_(*message_queue), handler_(*handler), shutdown_(false) {} + +void WebsocketClientConnection::LoopThreadDelegate::threadMain() { + while (!message_queue_.IsShuttingDown() && !shutdown_) { + DrainQueue(); + message_queue_.wait(); + } + DrainQueue(); +} + +void WebsocketClientConnection::LoopThreadDelegate::exitThreadMain() { + shutdown_ = true; + if (!message_queue_.IsShuttingDown()) { + message_queue_.Shutdown(); + } +} + +void WebsocketClientConnection::LoopThreadDelegate::DrainQueue() { + while (!message_queue_.empty()) { + Message message_ptr; + message_queue_.pop(message_ptr); + if (!shutdown_) { + boost::system::error_code ec; + if (handler_.cloud_properties.cloud_transport_type == "WS") { + handler_.ws_.write( + boost::asio::buffer(message_ptr->data(), message_ptr->data_size())); + } +#ifdef ENABLE_SECURITY + else if (handler_.cloud_properties.cloud_transport_type == "WSS") { + handler_.wss_.write( + boost::asio::buffer(message_ptr->data(), message_ptr->data_size())); + } +#endif // ENABLE_SECURITY + if (ec) { + LOG4CXX_ERROR(logger_, "Error writing to websocket"); + handler_.controller_->DataSendFailed(handler_.device_uid_, + handler_.app_handle_, + message_ptr, + DataSendError()); + } + } + } +} + +void WebsocketClientConnection::LoopThreadDelegate::SetShutdown() { + shutdown_ = true; + if (!message_queue_.IsShuttingDown()) { + message_queue_.Shutdown(); + } +} + +} // namespace transport_adapter +} // namespace transport_manager diff --git a/src/components/transport_manager/src/iap2_emulation/iap2_transport_adapter.cc b/src/components/transport_manager/src/iap2_emulation/iap2_transport_adapter.cc index 6b7d44ea8d..6a43f66c64 100644 --- a/src/components/transport_manager/src/iap2_emulation/iap2_transport_adapter.cc +++ b/src/components/transport_manager/src/iap2_emulation/iap2_transport_adapter.cc @@ -32,14 +32,14 @@ #include "transport_manager/iap2_emulation/iap2_transport_adapter.h" -#include <sys/types.h> -#include <sys/stat.h> #include <fcntl.h> -#include <unistd.h> #include <stdio.h> +#include <sys/stat.h> +#include <sys/types.h> +#include <unistd.h> -#include "utils/threads/thread.h" #include "utils/file_system.h" +#include "utils/threads/thread.h" namespace { static const mode_t mode = 0666; @@ -54,9 +54,9 @@ CREATE_LOGGERPTR_GLOBAL(logger_, "IAP2Emulation"); IAP2BluetoothEmulationTransportAdapter::IAP2BluetoothEmulationTransportAdapter( const uint16_t port, - resumption::LastState& last_state, + resumption::LastStateWrapperPtr last_state_wrapper, const TransportManagerSettings& settings) - : TcpTransportAdapter(port, last_state, settings) {} + : TcpTransportAdapter(port, last_state_wrapper, settings) {} void IAP2BluetoothEmulationTransportAdapter::DeviceSwitched( const DeviceUID& device_handle) { @@ -69,15 +69,21 @@ DeviceType IAP2BluetoothEmulationTransportAdapter::GetDeviceType() const { return IOS_BT; } +void IAP2BluetoothEmulationTransportAdapter::TransportConfigUpdated( + const TransportConfig& new_config) { + return; +} + IAP2USBEmulationTransportAdapter::IAP2USBEmulationTransportAdapter( const uint16_t port, - resumption::LastState& last_state, + resumption::LastStateWrapperPtr last_state_wrapper, const TransportManagerSettings& settings) - : TcpTransportAdapter(port, last_state, settings), out_(0) { + : TcpTransportAdapter(port, last_state_wrapper, settings), out_(0) { auto delegate = new IAPSignalHandlerDelegate(*this); signal_handler_ = threads::CreateThread("iAP signal handler", delegate); signal_handler_->start(); const auto result = mkfifo(out_signals_channel, mode); + UNUSED(result); LOG4CXX_DEBUG(logger_, "Out signals channel creation result: " << result); } @@ -119,10 +125,16 @@ DeviceType IAP2USBEmulationTransportAdapter::GetDeviceType() const { return IOS_USB; } +void IAP2USBEmulationTransportAdapter::TransportConfigUpdated( + const TransportConfig& new_config) { + return; +} + IAP2USBEmulationTransportAdapter::IAPSignalHandlerDelegate:: IAPSignalHandlerDelegate(IAP2USBEmulationTransportAdapter& adapter) : adapter_(adapter), run_flag_(true), in_(0) { const auto result = mkfifo(in_signals_channel, mode); + UNUSED(result); LOG4CXX_DEBUG(logger_, "In signals channel creation result: " << result); } @@ -170,5 +182,5 @@ void IAP2USBEmulationTransportAdapter::IAPSignalHandlerDelegate:: run_flag_ = false; ThreadDelegate::exitThreadMain(); } -} -} // namespace transport_manager::transport_adapter +} // namespace transport_adapter +} // namespace transport_manager diff --git a/src/components/transport_manager/src/tcp/network_interface_listener_impl.cc b/src/components/transport_manager/src/tcp/network_interface_listener_impl.cc new file mode 100644 index 0000000000..85c479134d --- /dev/null +++ b/src/components/transport_manager/src/tcp/network_interface_listener_impl.cc @@ -0,0 +1,42 @@ +#include "transport_manager/tcp/network_interface_listener_impl.h" +#include "platform_specific_network_interface_listener_impl.h" + +namespace transport_manager { +namespace transport_adapter { + +CREATE_LOGGERPTR_GLOBAL(logger_, "TransportManager") + +NetworkInterfaceListenerImpl::NetworkInterfaceListenerImpl( + TcpClientListener* tcp_client_listener, + const std::string designated_interface) + : platform_specific_impl_(new PlatformSpecificNetworkInterfaceListener( + tcp_client_listener, designated_interface)) { + LOG4CXX_AUTO_TRACE(logger_); +} + +NetworkInterfaceListenerImpl::~NetworkInterfaceListenerImpl() { + LOG4CXX_AUTO_TRACE(logger_); +} + +bool NetworkInterfaceListenerImpl::Init() { + LOG4CXX_AUTO_TRACE(logger_); + return platform_specific_impl_->Init(); +} + +void NetworkInterfaceListenerImpl::Deinit() { + LOG4CXX_AUTO_TRACE(logger_); + platform_specific_impl_->Deinit(); +} + +bool NetworkInterfaceListenerImpl::Start() { + LOG4CXX_AUTO_TRACE(logger_); + return platform_specific_impl_->Start(); +} + +bool NetworkInterfaceListenerImpl::Stop() { + LOG4CXX_AUTO_TRACE(logger_); + return platform_specific_impl_->Stop(); +} + +} // namespace transport_adapter +} // namespace transport_manager diff --git a/src/components/transport_manager/src/tcp/platform_specific/linux/platform_specific_network_interface_listener.cc b/src/components/transport_manager/src/tcp/platform_specific/linux/platform_specific_network_interface_listener.cc new file mode 100644 index 0000000000..2e79e84f2e --- /dev/null +++ b/src/components/transport_manager/src/tcp/platform_specific/linux/platform_specific_network_interface_listener.cc @@ -0,0 +1,673 @@ +#include "transport_manager/tcp/platform_specific/linux/platform_specific_network_interface_listener_impl.h" + +#include <arpa/inet.h> +#include <asm/types.h> +#include <errno.h> +#include <fcntl.h> +#include <ifaddrs.h> +#include <net/if.h> +#include <sys/select.h> +#include <sys/socket.h> +#include <sys/types.h> +#include <unistd.h> + +#include <linux/netlink.h> +#include <linux/rtnetlink.h> + +#include "transport_manager/tcp/tcp_client_listener.h" +#include "utils/logger.h" +#include "utils/threads/thread.h" + +namespace transport_manager { +namespace transport_adapter { + +CREATE_LOGGERPTR_GLOBAL(logger_, "TransportManager") + +static std::string GetInterfaceName(unsigned int if_index); +static bool SetNonblocking(int s); + +bool InterfaceStatus::IsAvailable() const { + // check if the interface is UP and RUNNING + return ((flags_ & IFF_UP) > 0) && ((flags_ & IFF_RUNNING) > 0); +} + +bool InterfaceStatus::IsLoopback() const { + return flags_ & IFF_LOOPBACK; +} + +bool InterfaceStatus::HasIPAddress() const { + return has_ipv4_ || has_ipv6_; +} + +std::string InterfaceStatus::GetIPv4Address() const { + char buf[INET_ADDRSTRLEN] = ""; + if (has_ipv4_ && IsAvailable()) { + inet_ntop(AF_INET, &ipv4_address_, buf, sizeof(buf)); + } + return std::string(buf); +} + +std::string InterfaceStatus::GetIPv6Address() const { + char buf[INET6_ADDRSTRLEN] = ""; + if (has_ipv6_ && IsAvailable()) { + inet_ntop(AF_INET6, &ipv6_address_, buf, sizeof(buf)); + } + return std::string(buf); +} + +void InterfaceStatus::SetIPv4Address(struct in_addr* addr) { + if (addr == NULL) { + has_ipv4_ = false; + } else { + ipv4_address_ = *addr; + has_ipv4_ = true; + } +} + +void InterfaceStatus::SetIPv6Address(struct in6_addr* addr) { + if (addr == NULL) { + has_ipv6_ = false; + } else { + ipv6_address_ = *addr; + has_ipv6_ = true; + } +} + +PlatformSpecificNetworkInterfaceListener:: + PlatformSpecificNetworkInterfaceListener( + TcpClientListener* tcp_client_listener, + const std::string designated_interface) + : tcp_client_listener_(tcp_client_listener) + , designated_interface_(designated_interface) + , selected_interface_("") + , notified_ipv4_addr_("") + , notified_ipv6_addr_("") + , socket_(-1) +#ifdef BUILD_TESTS + , testing_(false) +#endif // BUILD_TESTS +{ + pipe_fds_[0] = pipe_fds_[1] = -1; + thread_ = threads::CreateThread("PlatformSpecificNetworkInterfaceListener", + new ListenerThreadDelegate(this)); +} + +PlatformSpecificNetworkInterfaceListener:: + ~PlatformSpecificNetworkInterfaceListener() { + LOG4CXX_AUTO_TRACE(logger_); + + Stop(); + Deinit(); + + delete thread_->delegate(); + threads::DeleteThread(thread_); +} + +bool PlatformSpecificNetworkInterfaceListener::Init() { + LOG4CXX_AUTO_TRACE(logger_); + LOG4CXX_DEBUG(logger_, "Init socket: " << socket_); + if (socket_ >= 0) { + LOG4CXX_WARN(logger_, "Network interface listener is already initialized"); + return false; + } + + socket_ = socket(PF_NETLINK, SOCK_RAW, NETLINK_ROUTE); + if (socket_ == -1) { + LOG4CXX_ERROR_WITH_ERRNO(logger_, "Failed to create netlink socket"); + return false; + } + + if (!SetNonblocking(socket_)) { + LOG4CXX_WARN(logger_, "Failed to configure netlink socket to non-blocking"); + } + + struct sockaddr_nl addr; + memset(&addr, 0, sizeof(addr)); + addr.nl_family = AF_NETLINK; + addr.nl_pad = 0; + addr.nl_pid = 0; + addr.nl_groups = RTMGRP_LINK | RTMGRP_IPV4_IFADDR | RTMGRP_IPV6_IFADDR; + + if (bind(socket_, reinterpret_cast<sockaddr*>(&addr), sizeof(addr)) != 0) { + LOG4CXX_ERROR_WITH_ERRNO(logger_, "Failed to bind netlink socket"); + close(socket_); + socket_ = -1; + return false; + } + + if (pipe(pipe_fds_) != 0) { + LOG4CXX_ERROR_WITH_ERRNO(logger_, "Failed to create internal pipe"); + close(socket_); + socket_ = -1; + return false; + } + + if (!SetNonblocking(pipe_fds_[0])) { + LOG4CXX_WARN(logger_, "Failed to configure pipe to non-blocking"); + } + + return true; +} + +void PlatformSpecificNetworkInterfaceListener::Deinit() { + LOG4CXX_AUTO_TRACE(logger_); + LOG4CXX_DEBUG(logger_, "Deinit socket: " << socket_); + if (socket_ >= 0) { + close(socket_); + socket_ = -1; + } + if (pipe_fds_[1] >= 0) { + close(pipe_fds_[1]); + pipe_fds_[1] = -1; + } + if (pipe_fds_[0] >= 0) { + close(pipe_fds_[0]); + pipe_fds_[0] = -1; + } +} + +bool PlatformSpecificNetworkInterfaceListener::Start() { + LOG4CXX_AUTO_TRACE(logger_); + + if (socket_ < 0) { + LOG4CXX_WARN(logger_, "Interface listener is not initialized"); + return false; + } + + if (thread_->is_running()) { + LOG4CXX_WARN(logger_, "Interface listener is already started"); + return false; + } + + if (!thread_->start()) { + LOG4CXX_ERROR(logger_, "Failed to start interface listener"); + return false; + } + + LOG4CXX_INFO(logger_, "Network interface listener started"); + return true; +} + +bool PlatformSpecificNetworkInterfaceListener::Stop() { + LOG4CXX_AUTO_TRACE(logger_); + + if (!thread_->is_running()) { + LOG4CXX_DEBUG(logger_, "interface listener is not running"); + return false; + } + + thread_->join(); + + LOG4CXX_INFO(logger_, "Network interface listener stopped"); + return true; +} + +void PlatformSpecificNetworkInterfaceListener::Loop() { + LOG4CXX_AUTO_TRACE(logger_); + + // Initialize status_table_ by acquiring a list of interfaces and their + // current statuses. Also we will notify an event to the listener if IP + // address is already available. + InitializeStatus(); + NotifyIPAddresses(); + + // I am not sure required buffer size for netlink data structures. Most of + // implementation I found online uses 4096 so I followed them. + char buf[4096]; + fd_set rfds; + + while (1) { + FD_ZERO(&rfds); + FD_SET(socket_, &rfds); + FD_SET(pipe_fds_[0], &rfds); + int nfds = socket_ > pipe_fds_[0] ? socket_ : pipe_fds_[0]; + + // wait for some data from netlink socket (socket_) and our internal pipe + int ret = select(nfds + 1, &rfds, NULL, NULL, NULL); + if (ret < 0) { + if (errno == EINTR) { + continue; + } else { + LOG4CXX_WARN(logger_, + "select failed for netlink. Aborting interface listener."); + break; + } + } + + // Received data from internal pipe, indicating StopLoop() is called. + // We'll break the while() loop and eventually exit this thread. + if (FD_ISSET(pipe_fds_[0], &rfds)) { + ret = read(pipe_fds_[0], buf, sizeof(buf)); + if (ret < 0) { + if (errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK) { + LOG4CXX_WARN( + logger_, + "Failed to read from pipe. Aborting interface listener."); + break; + } + } else if (ret == 0) { + LOG4CXX_WARN(logger_, + "Pipe disconnected. Aborting interface listener."); + break; + } else { + LOG4CXX_DEBUG(logger_, "received terminating event through pipe"); + break; + } + } + +#ifdef BUILD_TESTS + if (testing_) { // don't enable events from network interface while testing + continue; + } +#endif // BUILD_TESTS + + // received data from netlink socket + if (FD_ISSET(socket_, &rfds)) { + ret = recv(socket_, buf, sizeof(buf), 0); + if (ret < 0) { + if (errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK) { + LOG4CXX_WARN(logger_, + "Failed to read from netlink socket. Aborting interface " + "listener."); + break; + } + } else if (ret == 0) { + LOG4CXX_WARN( + logger_, + "Netlink socket disconnected. Aborting interface listener."); + break; + } else { + struct nlmsghdr* header = reinterpret_cast<struct nlmsghdr*>(buf); + int len = ret; + + // Parse the stream. We may receive multiple (header + data) pairs at a + // time so we use for-loop to go through. + for (; NLMSG_OK(header, len); header = NLMSG_NEXT(header, len)) { + if (header->nlmsg_type == NLMSG_ERROR) { + LOG4CXX_WARN(logger_, "received error event from netlink"); + break; + } + + std::vector<EventParam> params; + + if (header->nlmsg_type == RTM_NEWLINK || + header->nlmsg_type == RTM_DELLINK) { + // For these events, data part contains an ifinfomsg struct and a + // series of rtattr structures. See rtnetlink(7). + // We are only interested in interface index and flags. + struct ifinfomsg* ifinfo_msg = + reinterpret_cast<struct ifinfomsg*>(NLMSG_DATA(header)); + EventParam param(ifinfo_msg->ifi_index, ifinfo_msg->ifi_flags); + params.push_back(param); + + } else if (header->nlmsg_type == RTM_NEWADDR || + header->nlmsg_type == RTM_DELADDR) { + // For these events, data part contains an ifaddrmsg struct and + // optionally some rtattr structures. We'll extract IP address(es) + // from them. + struct ifaddrmsg* ifaddr_msg = + reinterpret_cast<struct ifaddrmsg*>(NLMSG_DATA(header)); + unsigned int size = IFA_PAYLOAD(header); + params = ParseIFAddrMessage(ifaddr_msg, size); + + } else { + continue; + } + + // update status_table_ based on received data + UpdateStatus(header->nlmsg_type, params); + } + } + + // notify the listener if necessary + NotifyIPAddresses(); + } + } +} + +bool PlatformSpecificNetworkInterfaceListener::StopLoop() { + LOG4CXX_AUTO_TRACE(logger_); + + LOG4CXX_INFO(logger_, "Stopping network interface listener"); + + if (pipe_fds_[1] < 0) { + LOG4CXX_WARN(logger_, "StopLoop called in invalid state"); + return false; + } + + char dummy[1] = {0}; + int ret = write(pipe_fds_[1], dummy, sizeof(dummy)); + if (ret <= 0) { + LOG4CXX_WARN_WITH_ERRNO( + logger_, "Failed to send stop message to interface listener"); + return false; + } + + return true; +} + +bool PlatformSpecificNetworkInterfaceListener::InitializeStatus() { + LOG4CXX_AUTO_TRACE(logger_); + +#ifdef BUILD_TESTS + if (testing_) { + // don't actually call getifaddrs() + return true; + } +#endif // BUILD_TESTS + + struct ifaddrs *if_list, *interface; + if (getifaddrs(&if_list) != 0) { + LOG4CXX_WARN(logger_, + "getifaddr failed, interface status won't be available until " + "a change occurs"); + return false; + } + + // clear existing table + status_table_.clear(); + + for (interface = if_list; interface != NULL; + interface = interface->ifa_next) { + if (interface->ifa_name == NULL || interface->ifa_name[0] == '\0') { + continue; + } + if (interface->ifa_addr == NULL) { + continue; + } + + std::string ifname(interface->ifa_name); + InterfaceStatus& status = status_table_[ifname]; + + switch (interface->ifa_addr->sa_family) { + case AF_INET: { + struct sockaddr_in* addr = + reinterpret_cast<struct sockaddr_in*>(interface->ifa_addr); + status.SetIPv4Address(&addr->sin_addr); + break; + } + case AF_INET6: { + struct sockaddr_in6* addr = + reinterpret_cast<struct sockaddr_in6*>(interface->ifa_addr); + status.SetIPv6Address(&addr->sin6_addr); + break; + } + default: + continue; + } + status.SetFlags(interface->ifa_flags); + } + + freeifaddrs(if_list); + + LOG4CXX_DEBUG(logger_, "Successfully acquired network interface status"); + DumpTable(); + return true; +} + +bool PlatformSpecificNetworkInterfaceListener::UpdateStatus( + uint16_t type, std::vector<EventParam>& params) { + LOG4CXX_AUTO_TRACE(logger_); + + for (std::vector<EventParam>::iterator it = params.begin(); + it != params.end(); + ++it) { + std::string ifname = GetInterfaceName(it->if_index); + if (ifname.empty()) { + continue; + } + + InterfaceStatus& status = status_table_[ifname]; + + switch (type) { + case RTM_NEWLINK: { + LOG4CXX_DEBUG( + logger_, + "netlink event: interface " << ifname << " created or updated"); + status.SetFlags(it->flags); + break; + } + case RTM_DELLINK: + LOG4CXX_DEBUG(logger_, + "netlink event: interface " << ifname << " removed"); + status_table_.erase(ifname); + break; + case RTM_NEWADDR: { + sockaddr* addr = reinterpret_cast<sockaddr*>(&it->address); + if (addr->sa_family == AF_INET) { + sockaddr_in* addr_in = reinterpret_cast<sockaddr_in*>(addr); + status.SetIPv4Address(&addr_in->sin_addr); + LOG4CXX_DEBUG(logger_, + "netlink event: IPv4 address of interface " + << ifname << " updated to " + << status.GetIPv4Address()); + } else if (addr->sa_family == AF_INET6) { + sockaddr_in6* addr_in6 = reinterpret_cast<sockaddr_in6*>(addr); + status.SetIPv6Address(&addr_in6->sin6_addr); + LOG4CXX_DEBUG(logger_, + "netlink event: IPv6 address of interface " + << ifname << " updated to " + << status.GetIPv6Address()); + } + break; + } + case RTM_DELADDR: { + sockaddr* addr = reinterpret_cast<sockaddr*>(&it->address); + if (addr->sa_family == AF_INET) { + LOG4CXX_DEBUG(logger_, + "netlink event: IPv4 address of interface " + << ifname << " removed"); + status.SetIPv4Address(NULL); + } else if (addr->sa_family == AF_INET6) { + LOG4CXX_DEBUG(logger_, + "netlink event: IPv6 address of interface " + << ifname << " removed"); + status.SetIPv6Address(NULL); + } + break; + } + default: + LOG4CXX_WARN(logger_, "Unsupported netlink event (" << type << ")"); + break; + } + } + return true; +} + +void PlatformSpecificNetworkInterfaceListener::NotifyIPAddresses() { + LOG4CXX_AUTO_TRACE(logger_); + + std::string ipv4_addr; + std::string ipv6_addr; + const std::string interface_name = SelectInterface(); + + // note that if interface_name is empty (i.e. no interface is selected), + // the IP addresses will be empty + if (!interface_name.empty()) { + InterfaceStatusTable::iterator it = status_table_.find(interface_name); + if (status_table_.end() != it) { + InterfaceStatus& status = it->second; + ipv4_addr = status.GetIPv4Address(); + ipv6_addr = status.GetIPv6Address(); + } + } + + if (notified_ipv4_addr_ != ipv4_addr || notified_ipv6_addr_ != ipv6_addr) { + LOG4CXX_INFO(logger_, + "IP address updated: \"" << notified_ipv4_addr_ << "\" -> \"" + << ipv4_addr << "\", \"" + << notified_ipv6_addr_ << "\" -> \"" + << ipv6_addr << "\""); + + notified_ipv4_addr_ = ipv4_addr; + notified_ipv6_addr_ = ipv6_addr; + + tcp_client_listener_->OnIPAddressUpdated(notified_ipv4_addr_, + notified_ipv6_addr_); + } +} + +const std::string PlatformSpecificNetworkInterfaceListener::SelectInterface() { + LOG4CXX_AUTO_TRACE(logger_); + + if (!designated_interface_.empty()) { + return designated_interface_; + } + + InterfaceStatusTable::iterator it; + + if (!selected_interface_.empty()) { + // if current network interface is still available and has IP address, then + // we use it + it = status_table_.find(selected_interface_); + if (it != status_table_.end()) { + InterfaceStatus& status = it->second; + if (status.IsAvailable() && status.HasIPAddress()) { + return selected_interface_; + } + } + } + + // pick a network interface that has IP address + for (it = status_table_.begin(); it != status_table_.end(); ++it) { + InterfaceStatus& status = it->second; + // ignore loopback interfaces + if (status.IsLoopback()) { + continue; + } + // if the interface has to be UP and RUNNING, and must have an IP address + if (!(status.IsAvailable() && status.HasIPAddress())) { + continue; + } + + selected_interface_ = it->first; + LOG4CXX_DEBUG(logger_, + "selecting network interface: " << selected_interface_); + return selected_interface_; + } + + selected_interface_ = ""; + return selected_interface_; +} + +std::vector<PlatformSpecificNetworkInterfaceListener::EventParam> +PlatformSpecificNetworkInterfaceListener::ParseIFAddrMessage( + struct ifaddrmsg* message, unsigned int size) { + LOG4CXX_AUTO_TRACE(logger_); + + std::vector<EventParam> params; + + // Iterate through rtattr structs. (The first one can be acquired through + // IFA_RTA() macro) + for (struct rtattr* attr = IFA_RTA(message); RTA_OK(attr, size); + attr = RTA_NEXT(attr, size)) { + if (!(attr->rta_type == IFA_LOCAL || attr->rta_type == IFA_ADDRESS)) { + continue; + } + + EventParam param(message->ifa_index); + + if (message->ifa_family == AF_INET) { + // make sure the size of data is >= 4 bytes + if (RTA_PAYLOAD(attr) < sizeof(struct in_addr)) { + LOG4CXX_DEBUG(logger_, + "Invalid netlink event: insufficient IPv4 address data"); + continue; + } + + // Data part of rtattr contains IPv4 address. Copy it to param.address + struct in_addr* ipv4_addr = + reinterpret_cast<struct in_addr*>(RTA_DATA(attr)); + + struct sockaddr_in* sockaddr = + reinterpret_cast<struct sockaddr_in*>(¶m.address); + sockaddr->sin_family = AF_INET; + sockaddr->sin_addr = *ipv4_addr; + + } else if (message->ifa_family == AF_INET6) { + // make sure the size of data is >= 16 bytes + if (RTA_PAYLOAD(attr) < sizeof(struct in6_addr)) { + LOG4CXX_DEBUG(logger_, + "Invalid netlink event: insufficient IPv6 address data"); + continue; + } + + // Data part of rtattr contains IPv6 address. Copy it to param.address + struct in6_addr* ipv6_addr = + reinterpret_cast<struct in6_addr*>(RTA_DATA(attr)); + + struct sockaddr_in6* sockaddr = + reinterpret_cast<struct sockaddr_in6*>(¶m.address); + sockaddr->sin6_family = AF_INET6; + sockaddr->sin6_addr = *ipv6_addr; + + } else { + LOG4CXX_WARN(logger_, + "Unsupported family (" << message->ifa_family << ")"); + continue; + } + + params.push_back(param); + } + + return params; +} + +void PlatformSpecificNetworkInterfaceListener::DumpTable() const { + LOG4CXX_DEBUG(logger_, + "Number of network interfaces: " << status_table_.size()); + + for (auto it = status_table_.begin(); it != status_table_.end(); ++it) { + const std::string ifname = it->first; + const InterfaceStatus& status = it->second; + UNUSED(status); + + LOG4CXX_DEBUG( + logger_, + " " << ifname << " : flags=" << status.GetFlags() + << " : available: " << (status.IsAvailable() ? "yes" : "no") + << " IPv4: " << status.GetIPv4Address() + << " IPv6: " << status.GetIPv6Address() + << (status.IsLoopback() ? " (loopback)" : "")); + } +} + +PlatformSpecificNetworkInterfaceListener::ListenerThreadDelegate:: + ListenerThreadDelegate(PlatformSpecificNetworkInterfaceListener* parent) + : parent_(parent) {} + +void PlatformSpecificNetworkInterfaceListener::ListenerThreadDelegate:: + threadMain() { + parent_->Loop(); +} + +void PlatformSpecificNetworkInterfaceListener::ListenerThreadDelegate:: + exitThreadMain() { + parent_->StopLoop(); +} + +static std::string GetInterfaceName(unsigned int if_index) { + char buf[IFNAMSIZ + 1] = ""; + if_indextoname(if_index, buf); + return std::string(buf); +} + +static bool SetNonblocking(int s) { + int prev_flag = fcntl(s, F_GETFL, 0); + if (prev_flag == -1) { + LOG4CXX_ERROR_WITH_ERRNO(logger_, "Failed to acquire socket flag"); + return false; + } + + int ret = fcntl(s, F_SETFL, prev_flag | O_NONBLOCK); + if (ret == -1) { + LOG4CXX_ERROR_WITH_ERRNO(logger_, + "Failed to configure socket to non-blocking"); + return false; + } + + return true; +} + +} // namespace transport_adapter +} // namespace transport_manager diff --git a/src/components/transport_manager/src/tcp/platform_specific/qnx/platform_specific_network_interface_listener.cc b/src/components/transport_manager/src/tcp/platform_specific/qnx/platform_specific_network_interface_listener.cc new file mode 100644 index 0000000000..15b3814999 --- /dev/null +++ b/src/components/transport_manager/src/tcp/platform_specific/qnx/platform_specific_network_interface_listener.cc @@ -0,0 +1,38 @@ +#include "transport_manager/tcp/platform_specific/qnx/platform_specific_network_interface_listener_impl.h" + +namespace transport_manager { +namespace transport_adapter { + +CREATE_LOGGERPTR_GLOBAL(logger_, "TransportManager") + +PlatformSpecificNetworkInterfaceListener:: + PlatformSpecificNetworkInterfaceListener( + TcpClientListener* tcp_client_listener, + const std::string designated_interface) {} + +PlatformSpecificNetworkInterfaceListener:: + ~PlatformSpecificNetworkInterfaceListener() { + LOG4CXX_AUTO_TRACE(logger_); +} + +bool PlatformSpecificNetworkInterfaceListener::Init() { + LOG4CXX_AUTO_TRACE(logger_); + return true; +} + +void PlatformSpecificNetworkInterfaceListener::Deinit() { + LOG4CXX_AUTO_TRACE(logger_); +} + +bool PlatformSpecificNetworkInterfaceListener::Start() { + LOG4CXX_AUTO_TRACE(logger_); + return true; +} + +bool PlatformSpecificNetworkInterfaceListener::Stop() { + LOG4CXX_AUTO_TRACE(logger_); + return true; +} + +} // namespace transport_adapter +} // namespace transport_manager diff --git a/src/components/transport_manager/src/tcp/tcp_client_listener.cc b/src/components/transport_manager/src/tcp/tcp_client_listener.cc index 207149eb8c..c2cbac4e13 100644 --- a/src/components/transport_manager/src/tcp/tcp_client_listener.cc +++ b/src/components/transport_manager/src/tcp/tcp_client_listener.cc @@ -33,103 +33,123 @@ #include "transport_manager/tcp/tcp_client_listener.h" +#include <arpa/inet.h> +#include <errno.h> +#include <fcntl.h> +#include <ifaddrs.h> #include <memory.h> #include <signal.h> -#include <errno.h> -#include <arpa/inet.h> -#include <unistd.h> -#include <sys/types.h> -#include <sys/sysctl.h> +#include <sys/select.h> #include <sys/socket.h> +#include <sys/sysctl.h> +#include <sys/types.h> +#include <unistd.h> #ifdef __linux__ #include <linux/tcp.h> #else // __linux__ -#include <sys/time.h> #include <netinet/in.h> #include <netinet/tcp.h> #include <netinet/tcp_var.h> +#include <sys/time.h> #endif // __linux__ #include <sstream> #include "utils/logger.h" -#include "utils/make_shared.h" -#include "utils/threads/thread.h" -#include "transport_manager/transport_adapter/transport_adapter_controller.h" + +#include "transport_manager/tcp/network_interface_listener_impl.h" #include "transport_manager/tcp/tcp_device.h" #include "transport_manager/tcp/tcp_socket_connection.h" +#include "transport_manager/transport_adapter/transport_adapter_controller.h" +#include "utils/threads/thread.h" namespace transport_manager { namespace transport_adapter { CREATE_LOGGERPTR_GLOBAL(logger_, "TransportManager") +static bool SetNonblocking(int s); + +#ifdef BUILD_TESTS +bool TcpClientListener::testing_ = false; +#endif // BUILD_TESTS + TcpClientListener::TcpClientListener(TransportAdapterController* controller, const uint16_t port, - const bool enable_keepalive) + const bool enable_keepalive, + const std::string designated_interface) : port_(port) , enable_keepalive_(enable_keepalive) , controller_(controller) + , initialized_(false) + , started_(false) , thread_(0) , socket_(-1) - , thread_stop_requested_(false) { + , thread_stop_requested_(false) + , designated_interface_(designated_interface) { + pipe_fds_[0] = pipe_fds_[1] = -1; thread_ = threads::CreateThread("TcpClientListener", new ListeningThreadDelegate(this)); + interface_listener_ = + new NetworkInterfaceListenerImpl(this, designated_interface); } TransportAdapter::Error TcpClientListener::Init() { LOG4CXX_AUTO_TRACE(logger_); thread_stop_requested_ = false; - socket_ = socket(AF_INET, SOCK_STREAM, 0); - if (-1 == socket_) { - LOG4CXX_ERROR_WITH_ERRNO(logger_, "Failed to create socket"); - return TransportAdapter::FAIL; - } - - sockaddr_in server_address = {0}; - server_address.sin_family = AF_INET; - server_address.sin_port = htons(port_); - server_address.sin_addr.s_addr = INADDR_ANY; - - int optval = 1; - if (0 != - setsockopt(socket_, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval))) { - LOG4CXX_WARN_WITH_ERRNO(logger_, "setsockopt SO_REUSEADDR failed"); + if (!IsListeningOnSpecificInterface()) { + // Network interface is not specified. We will listen on all interfaces + // using INADDR_ANY. If socket creation fails, we will treat it an error. + socket_ = CreateIPv4ServerSocket(port_); + if (-1 == socket_) { + LOG4CXX_ERROR(logger_, "Failed to create TCP socket"); + return TransportAdapter::FAIL; + } + } else { + // Network interface is specified and we wiill listen only on the interface. + // In this case, the server socket will be created once + // NetworkInterfaceListener notifies the interface's IP address. + LOG4CXX_INFO(logger_, + "TCP server socket will listen on " + << designated_interface_ + << " once it has an IPv4 address."); } - if (bind(socket_, - reinterpret_cast<sockaddr*>(&server_address), - sizeof(server_address)) != 0) { - LOG4CXX_ERROR_WITH_ERRNO(logger_, "bind() failed"); + if (!interface_listener_->Init()) { + if (socket_ >= 0) { + close(socket_); + socket_ = -1; + } return TransportAdapter::FAIL; } - const int kBacklog = 128; - if (0 != listen(socket_, kBacklog)) { - LOG4CXX_ERROR_WITH_ERRNO(logger_, "listen() failed"); - return TransportAdapter::FAIL; - } + initialized_ = true; return TransportAdapter::OK; } void TcpClientListener::Terminate() { LOG4CXX_AUTO_TRACE(logger_); - if (socket_ == -1) { - LOG4CXX_WARN(logger_, "Socket has been closed"); + + if (!initialized_) { return; } - if (shutdown(socket_, SHUT_RDWR) != 0) { - LOG4CXX_ERROR_WITH_ERRNO(logger_, "Failed to shutdown socket"); - } - if (close(socket_) != 0) { - LOG4CXX_ERROR_WITH_ERRNO(logger_, "Failed to close socket"); + + if (!IsListeningOnSpecificInterface()) { + DestroyServerSocket(socket_); + socket_ = -1; + } else { + sync_primitives::AutoLock auto_lock(start_stop_lock_); + DestroyServerSocket(socket_); + socket_ = -1; } - socket_ = -1; + + interface_listener_->Deinit(); + initialized_ = false; } bool TcpClientListener::IsInitialised() const { - return thread_; + return initialized_; } TcpClientListener::~TcpClientListener() { @@ -138,6 +158,7 @@ TcpClientListener::~TcpClientListener() { delete thread_->delegate(); threads::DeleteThread(thread_); Terminate(); + delete interface_listener_; } void SetKeepaliveOptions(const int fd) { @@ -203,108 +224,206 @@ void SetKeepaliveOptions(const int fd) { void TcpClientListener::Loop() { LOG4CXX_AUTO_TRACE(logger_); - while (!thread_stop_requested_) { - sockaddr_in client_address; - socklen_t client_address_size = sizeof(client_address); - const int connection_fd = accept( - socket_, (struct sockaddr*)&client_address, &client_address_size); - if (thread_stop_requested_) { - LOG4CXX_DEBUG(logger_, "thread_stop_requested_"); - close(connection_fd); - break; - } - - if (connection_fd < 0) { - LOG4CXX_ERROR_WITH_ERRNO(logger_, "accept() failed"); - continue; - } + fd_set rfds; + char dummy[16]; - if (AF_INET != client_address.sin_family) { - LOG4CXX_DEBUG(logger_, "Address of connected client is invalid"); - close(connection_fd); - continue; + while (!thread_stop_requested_) { + FD_ZERO(&rfds); + FD_SET(socket_, &rfds); + FD_SET(pipe_fds_[0], &rfds); + int nfds = socket_ > pipe_fds_[0] ? socket_ : pipe_fds_[0]; + + int ret = select(nfds + 1, &rfds, NULL, NULL, NULL); + if (ret < 0) { + if (errno == EINTR) { + continue; + } else { + LOG4CXX_WARN(logger_, "select failed for TCP server socket"); + break; + } } - char device_name[32]; - strncpy(device_name, - inet_ntoa(client_address.sin_addr), - sizeof(device_name) / sizeof(device_name[0])); - LOG4CXX_INFO(logger_, "Connected client " << device_name); - LOG4CXX_INFO(logger_, "Port is: " << port_); - - if (enable_keepalive_) { - SetKeepaliveOptions(connection_fd); + if (FD_ISSET(pipe_fds_[0], &rfds)) { + ret = read(pipe_fds_[0], dummy, sizeof(dummy)); + if (ret < 0) { + if (errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK) { + LOG4CXX_WARN( + logger_, + "Failed to read from pipe, aborting TCP server socket loop."); + break; + } + } else if (ret == 0) { + LOG4CXX_WARN(logger_, + "Pipe disconnected, aborting TCP server socket loop."); + break; + } else { + LOG4CXX_DEBUG(logger_, + "received stop command of TCP server socket loop"); + break; + } } - const auto device_uid = - device_name + std::string(":") + std::to_string(port_); - -#if defined(BUILD_TESTS) - TcpDevice* tcp_device = - new TcpDevice(client_address.sin_addr.s_addr, device_uid, device_name); + if (FD_ISSET(socket_, &rfds)) { + sockaddr_in client_address; + socklen_t client_address_size = sizeof(client_address); + const int connection_fd = accept( + socket_, (struct sockaddr*)&client_address, &client_address_size); + if (thread_stop_requested_) { + LOG4CXX_DEBUG(logger_, "thread_stop_requested_"); + close(connection_fd); + break; + } + + if (connection_fd < 0) { + LOG4CXX_ERROR_WITH_ERRNO(logger_, "accept() failed"); + continue; + } + + if (AF_INET != client_address.sin_family) { + LOG4CXX_DEBUG(logger_, "Address of connected client is invalid"); + close(connection_fd); + continue; + } + + char device_name[32]; + size_t size = sizeof(device_name) / sizeof(device_name[0]); + strncpy(device_name, inet_ntoa(client_address.sin_addr), size); + + device_name[size - 1] = '\0'; + LOG4CXX_INFO(logger_, "Connected client " << device_name); + LOG4CXX_INFO(logger_, "Port is: " << port_); + + if (enable_keepalive_) { + SetKeepaliveOptions(connection_fd); + } + + const auto device_uid = + device_name + std::string(":") + std::to_string(port_); + +#if defined(ENABLE_IAP2EMULATION) + auto tcp_device = std::make_shared<TcpDevice>( + client_address.sin_addr.s_addr, device_uid, device_name); #else - TcpDevice* tcp_device = - new TcpDevice(client_address.sin_addr.s_addr, device_uid); -#endif // BUILD_TESTS - - DeviceSptr device = controller_->AddDevice(tcp_device); - tcp_device = static_cast<TcpDevice*>(device.get()); - const ApplicationHandle app_handle = - tcp_device->AddIncomingApplication(connection_fd); - - utils::SharedPtr<TcpSocketConnection> connection = - utils::MakeShared<TcpSocketConnection>( - device->unique_device_id(), app_handle, controller_); - controller_->ConnectionCreated( - connection, device->unique_device_id(), app_handle); - connection->set_socket(connection_fd); - const TransportAdapter::Error error = connection->Start(); - if (TransportAdapter::OK != error) { - LOG4CXX_ERROR(logger_, - "TCP connection::Start() failed with error: " << error); + auto tcp_device = std::make_shared<TcpDevice>( + client_address.sin_addr.s_addr, device_uid); +#endif // ENABLE_IAP2EMULATION + + DeviceSptr device = controller_->AddDevice(tcp_device); + auto tcp_device_raw = static_cast<TcpDevice*>(device.get()); + const ApplicationHandle app_handle = + tcp_device_raw->AddIncomingApplication(connection_fd); + + std::shared_ptr<TcpSocketConnection> connection = + std::make_shared<TcpSocketConnection>( + device->unique_device_id(), app_handle, controller_); + controller_->ConnectionCreated( + connection, device->unique_device_id(), app_handle); + connection->set_socket(connection_fd); + const TransportAdapter::Error error = connection->Start(); + if (TransportAdapter::OK != error) { + LOG4CXX_ERROR(logger_, + "TCP connection::Start() failed with error: " << error); + } } } + + LOG4CXX_INFO(logger_, "TCP server socket loop is terminated."); } void TcpClientListener::StopLoop() { LOG4CXX_AUTO_TRACE(logger_); + if (pipe_fds_[1] < 0) { + LOG4CXX_WARN(logger_, "StopLoop called in invalid state"); + return; + } + thread_stop_requested_ = true; - // We need to connect to the listening socket to unblock accept() call - int byesocket = socket(AF_INET, SOCK_STREAM, 0); - sockaddr_in server_address = {0}; - server_address.sin_family = AF_INET; - server_address.sin_port = htons(port_); - server_address.sin_addr.s_addr = INADDR_ANY; - if (0 != connect(byesocket, - reinterpret_cast<sockaddr*>(&server_address), - sizeof(server_address))) { - LOG4CXX_WARN_WITH_ERRNO(logger_, "Failed to connect byesocket"); - } else { - // Can only shutdown socket if connected - if (0 != shutdown(byesocket, SHUT_RDWR)) { - LOG4CXX_WARN_WITH_ERRNO(logger_, "Failed to shutdown byesocket"); - } + + char dummy[1] = {0}; + int ret = write(pipe_fds_[1], dummy, sizeof(dummy)); + if (ret <= 0) { + LOG4CXX_WARN_WITH_ERRNO( + logger_, "Failed to send stop message to TCP server socket loop"); } - close(byesocket); } TransportAdapter::Error TcpClientListener::StartListening() { LOG4CXX_AUTO_TRACE(logger_); - if (thread_->is_running()) { + if (started_) { LOG4CXX_WARN( logger_, "TransportAdapter::BAD_STATE. Listener has already been started"); return TransportAdapter::BAD_STATE; } - if (!thread_->start()) { - LOG4CXX_ERROR(logger_, "Tcp client listener thread start failed"); + if (!interface_listener_->Start()) { return TransportAdapter::FAIL; } + + if (!IsListeningOnSpecificInterface()) { + TransportAdapter::Error ret = StartListeningThread(); + if (TransportAdapter::OK != ret) { + LOG4CXX_ERROR(logger_, "Tcp client listener thread start failed"); + interface_listener_->Stop(); + return ret; + } + } + + started_ = true; LOG4CXX_INFO(logger_, "Tcp client listener has started successfully"); return TransportAdapter::OK; } +TransportAdapter::Error TcpClientListener::ResumeListening() { + LOG4CXX_AUTO_TRACE(logger_); + + interface_listener_->Init(); + StartListeningThread(); + started_ = true; + + LOG4CXX_INFO(logger_, "Tcp client listener was resumed successfully"); + return TransportAdapter::OK; +} + +TransportAdapter::Error TcpClientListener::StopListening() { + LOG4CXX_AUTO_TRACE(logger_); + if (!started_) { + LOG4CXX_DEBUG(logger_, "TcpClientListener is not running now"); + return TransportAdapter::BAD_STATE; + } + + interface_listener_->Stop(); + + StopListeningThread(); + + started_ = false; + LOG4CXX_INFO(logger_, "Tcp client listener was stopped successfully"); + return TransportAdapter::OK; +} + +TransportAdapter::Error TcpClientListener::SuspendListening() { + LOG4CXX_AUTO_TRACE(logger_); + if (!started_) { + LOG4CXX_DEBUG(logger_, "TcpClientListener is not running now"); + return TransportAdapter::BAD_STATE; + } + + if (shutdown(socket_, SHUT_RDWR) != 0) { + LOG4CXX_WARN(logger_, "Socket was unable to be shutdowned"); + } + + if (close(socket_) != 0) { + LOG4CXX_ERROR_WITH_ERRNO(logger_, "Failed to close socket"); + } + + interface_listener_->Deinit(); + StopListeningThread(); + started_ = false; + + LOG4CXX_INFO(logger_, "Tcp client listener was suspended"); + return TransportAdapter::OK; +} + void TcpClientListener::ListeningThreadDelegate::exitThreadMain() { parent_->StopLoop(); } @@ -317,18 +436,272 @@ TcpClientListener::ListeningThreadDelegate::ListeningThreadDelegate( TcpClientListener* parent) : parent_(parent) {} -TransportAdapter::Error TcpClientListener::StopListening() { +TransportAdapter::Error TcpClientListener::StartListeningThread() { LOG4CXX_AUTO_TRACE(logger_); - if (!thread_->is_running()) { - LOG4CXX_DEBUG(logger_, "TcpClientListener is not running now"); - return TransportAdapter::BAD_STATE; + + // StartListening() can be called from multiple threads + sync_primitives::AutoLock auto_lock(start_stop_lock_); + + if (pipe_fds_[0] < 0 || pipe_fds_[1] < 0) { + // recreate the pipe every time, so that the thread loop will not get + // leftover + // data inside pipe after it is started + if (pipe(pipe_fds_) != 0) { + LOG4CXX_ERROR_WITH_ERRNO(logger_, "Failed to create internal pipe"); + return TransportAdapter::FAIL; + } + if (!SetNonblocking(pipe_fds_[0])) { + LOG4CXX_WARN(logger_, "Failed to configure pipe to non-blocking"); + } } + thread_stop_requested_ = false; + + if (!thread_->start()) { + return TransportAdapter::FAIL; + } + return TransportAdapter::OK; +} + +TransportAdapter::Error TcpClientListener::StopListeningThread() { + LOG4CXX_AUTO_TRACE(logger_); + + // StopListening() can be called from multiple threads + sync_primitives::AutoLock auto_lock(start_stop_lock_); + thread_->join(); - LOG4CXX_INFO(logger_, "Tcp client listener has stopped successfully"); + close(pipe_fds_[1]); + pipe_fds_[1] = -1; + close(pipe_fds_[0]); + pipe_fds_[0] = -1; + return TransportAdapter::OK; } +void TcpClientListener::OnIPAddressUpdated(const std::string ipv4_addr, + const std::string ipv6_addr) { + LOG4CXX_AUTO_TRACE(logger_); + + // Since we only create a TCP socket with IPv4 option (AF_INET), currently we + // do not use IPv6 address. + if (ipv4_addr != current_ip_address_) { + if (IsListeningOnSpecificInterface()) { + if (!current_ip_address_.empty()) { + // the server socket is running, terminate it + LOG4CXX_DEBUG( + logger_, + "Stopping current TCP server socket on " << designated_interface_); + StopOnNetworkInterface(); + } + if (!ipv4_addr.empty()) { + // start (or restart) server socket with the new IP address + LOG4CXX_DEBUG( + logger_, "Starting TCP server socket on " << designated_interface_); + StartOnNetworkInterface(); + } + } + + current_ip_address_ = ipv4_addr; + + std::string enabled = !current_ip_address_.empty() ? "true" : "false"; + std::ostringstream oss; + oss << port_; + + TransportConfig config; + config.insert(std::make_pair(tc_enabled, enabled)); + config.insert(std::make_pair(tc_tcp_ip_address, current_ip_address_)); + config.insert(std::make_pair(tc_tcp_port, oss.str())); + + controller_->TransportConfigUpdated(config); + } +} + +bool TcpClientListener::StartOnNetworkInterface() { + LOG4CXX_AUTO_TRACE(logger_); + + // this method is only for the case that network interface is specified + if (IsListeningOnSpecificInterface()) { + { + // make sure that two threads will not update socket_ at the same time + sync_primitives::AutoLock auto_lock(start_stop_lock_); + if (socket_ < 0) { + socket_ = CreateIPv4ServerSocket(port_, designated_interface_); + if (-1 == socket_) { + LOG4CXX_WARN(logger_, "Failed to create TCP socket"); + return false; + } + } + } + + if (TransportAdapter::OK != StartListeningThread()) { + LOG4CXX_WARN(logger_, "Failed to start TCP client listener"); + return false; + } + LOG4CXX_INFO(logger_, + "TCP server socket started on " << designated_interface_); + } + return true; +} + +bool TcpClientListener::StopOnNetworkInterface() { + LOG4CXX_AUTO_TRACE(logger_); + + if (IsListeningOnSpecificInterface()) { + if (TransportAdapter::OK != StopListeningThread()) { + LOG4CXX_WARN(logger_, "Failed to stop TCP client listener"); + return false; + } + + { + sync_primitives::AutoLock auto_lock(start_stop_lock_); + DestroyServerSocket(socket_); + socket_ = -1; + } + + LOG4CXX_INFO( + logger_, + "TCP server socket on " << designated_interface_ << " stopped"); + } + return true; +} + +bool TcpClientListener::IsListeningOnSpecificInterface() const { + return !designated_interface_.empty(); +} + +int TcpClientListener::CreateIPv4ServerSocket( + uint16_t port, const std::string interface_name) { + LOG4CXX_AUTO_TRACE(logger_); + + struct in_addr ipv4_address; + memset(&ipv4_address, 0, sizeof(ipv4_address)); + if (interface_name.empty()) { + ipv4_address.s_addr = htonl(INADDR_ANY); + } else if (!GetIPv4Address(interface_name, &ipv4_address)) { + return -1; + } + + int sock = socket(AF_INET, SOCK_STREAM, 0); + if (-1 == sock) { + LOG4CXX_ERROR_WITH_ERRNO(logger_, "Failed to create socket"); + return -1; + } + + sockaddr_in server_address = {0}; + server_address.sin_family = AF_INET; + server_address.sin_port = htons(port); + server_address.sin_addr = ipv4_address; + + int optval = 1; + if (0 != + setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval))) { + LOG4CXX_WARN_WITH_ERRNO(logger_, "setsockopt SO_REUSEADDR failed"); + } + + if (bind(sock, + reinterpret_cast<sockaddr*>(&server_address), + sizeof(server_address)) != 0) { + LOG4CXX_ERROR_WITH_ERRNO(logger_, "bind() failed"); + close(sock); + return -1; + } + + const int kBacklog = 128; + if (0 != listen(sock, kBacklog)) { + LOG4CXX_ERROR_WITH_ERRNO(logger_, "listen() failed"); + close(sock); + return -1; + } + + return sock; +} + +void TcpClientListener::DestroyServerSocket(int sock) { + LOG4CXX_AUTO_TRACE(logger_); + if (sock >= 0) { + if (shutdown(sock, SHUT_RDWR) != 0) { + LOG4CXX_ERROR_WITH_ERRNO(logger_, "Failed to shutdown socket"); + } + if (close(sock) != 0) { + LOG4CXX_ERROR_WITH_ERRNO(logger_, "Failed to close socket"); + } + } +} + +bool TcpClientListener::GetIPv4Address(const std::string interface_name, + struct in_addr* ip_address) { + LOG4CXX_AUTO_TRACE(logger_); + +#ifdef BUILD_TESTS + if (testing_) { + // don't actually call getifaddrs(), instead return a dummy address of + // INADDR_LOOPBACK + struct in_addr dummy_addr; + dummy_addr.s_addr = htonl(INADDR_LOOPBACK); + + if (ip_address != NULL) { + *ip_address = dummy_addr; + } + return true; + } +#endif // BUILD_TESTS + + struct ifaddrs* if_list; + if (getifaddrs(&if_list) != 0) { + LOG4CXX_WARN(logger_, "getifaddrs failed"); + return false; + } + + struct ifaddrs* interface; + bool found = false; + + for (interface = if_list; interface != NULL; + interface = interface->ifa_next) { + if (interface->ifa_name == NULL) { + continue; + } + if (interface_name == interface->ifa_name) { + if (interface->ifa_addr == NULL) { + continue; + } + switch (interface->ifa_addr->sa_family) { + case AF_INET: { + struct sockaddr_in* addr = + reinterpret_cast<struct sockaddr_in*>(interface->ifa_addr); + if (ip_address != NULL) { + *ip_address = addr->sin_addr; + } + found = true; + break; + } + default: + break; + } + } + } + + freeifaddrs(if_list); + + return found; +} + +static bool SetNonblocking(int s) { + int prev_flag = fcntl(s, F_GETFL, 0); + if (prev_flag == -1) { + LOG4CXX_ERROR_WITH_ERRNO(logger_, "Failed to acquire socket flag"); + return false; + } + + int ret = fcntl(s, F_SETFL, prev_flag | O_NONBLOCK); + if (ret == -1) { + LOG4CXX_ERROR_WITH_ERRNO(logger_, + "Failed to configure socket to non-blocking"); + return false; + } + + return true; +} + } // namespace transport_adapter } // namespace transport_manager diff --git a/src/components/transport_manager/src/tcp/tcp_connection_factory.cc b/src/components/transport_manager/src/tcp/tcp_connection_factory.cc index 114425076a..50d1c74fa7 100644 --- a/src/components/transport_manager/src/tcp/tcp_connection_factory.cc +++ b/src/components/transport_manager/src/tcp/tcp_connection_factory.cc @@ -30,12 +30,11 @@ * POSSIBILITY OF SUCH DAMAGE. */ -#include "transport_manager/transport_adapter/transport_adapter_controller.h" #include "transport_manager/tcp/tcp_connection_factory.h" #include "transport_manager/tcp/tcp_server_originated_socket_connection.h" +#include "transport_manager/transport_adapter/transport_adapter_controller.h" #include "utils/logger.h" -#include "utils/make_shared.h" namespace transport_manager { namespace transport_adapter { @@ -53,11 +52,11 @@ TransportAdapter::Error TcpConnectionFactory::Init() { TransportAdapter::Error TcpConnectionFactory::CreateConnection( const DeviceUID& device_uid, const ApplicationHandle& app_handle) { LOG4CXX_AUTO_TRACE(logger_); - LOG4CXX_DEBUG(logger_, - "DeviceUID: " << &device_uid - << ", ApplicationHandle: " << &app_handle); - utils::SharedPtr<TcpServerOriginatedSocketConnection> connection = - utils::MakeShared<TcpServerOriginatedSocketConnection>( + LOG4CXX_DEBUG( + logger_, + "DeviceUID: " << &device_uid << ", ApplicationHandle: " << &app_handle); + std::shared_ptr<TcpServerOriginatedSocketConnection> connection = + std::make_shared<TcpServerOriginatedSocketConnection>( device_uid, app_handle, controller_); controller_->ConnectionCreated(connection, device_uid, app_handle); const TransportAdapter::Error error = connection->Start(); diff --git a/src/components/transport_manager/src/tcp/tcp_device.cc b/src/components/transport_manager/src/tcp/tcp_device.cc index dbcb5d38cb..16abdfc22d 100644 --- a/src/components/transport_manager/src/tcp/tcp_device.cc +++ b/src/components/transport_manager/src/tcp/tcp_device.cc @@ -30,8 +30,8 @@ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ -#include "utils/logger.h" #include "transport_manager/tcp/tcp_device.h" +#include "utils/logger.h" namespace transport_manager { namespace transport_adapter { @@ -46,7 +46,7 @@ TcpDevice::TcpDevice(const in_addr_t& in_addr, const std::string& name) LOG4CXX_AUTO_TRACE(logger_); } -#if defined(BUILD_TESTS) +#if defined(ENABLE_IAP2EMULATION) TcpDevice::TcpDevice(const in_addr_t& in_addr, const std::string& device_uid, const std::string& transport_switch_id) @@ -57,11 +57,11 @@ TcpDevice::TcpDevice(const in_addr_t& in_addr, LOG4CXX_AUTO_TRACE(logger_); LOG4CXX_DEBUG(logger_, "Device created with transport switch emulation support."); - LOG4CXX_DEBUG(logger_, - "Device parameters: " << device_uid << " / " - << transport_switch_id); + LOG4CXX_DEBUG( + logger_, + "Device parameters: " << device_uid << " / " << transport_switch_id); } -#endif // BUILD_TESTS +#endif // ENABLE_IAP2EMULATION bool TcpDevice::IsSameAs(const Device* other) const { LOG4CXX_AUTO_TRACE(logger_); diff --git a/src/components/transport_manager/src/tcp/tcp_server_originated_socket_connection.cc b/src/components/transport_manager/src/tcp/tcp_server_originated_socket_connection.cc index 516f2d3ec4..690b2d25b3 100644 --- a/src/components/transport_manager/src/tcp/tcp_server_originated_socket_connection.cc +++ b/src/components/transport_manager/src/tcp/tcp_server_originated_socket_connection.cc @@ -56,7 +56,7 @@ bool TcpServerOriginatedSocketConnection::Establish(ConnectError** error) { DCHECK(error); LOG4CXX_DEBUG(logger_, "error " << error); DeviceSptr device = controller()->FindDevice(device_handle()); - if (!device.valid()) { + if (device.use_count() == 0) { LOG4CXX_ERROR(logger_, "Device " << device_handle() << " not found"); *error = new ConnectError(); return false; @@ -65,9 +65,9 @@ bool TcpServerOriginatedSocketConnection::Establish(ConnectError** error) { const int port = tcp_device->GetApplicationPort(application_handle()); if (-1 == port) { - LOG4CXX_ERROR(logger_, - "Application port for " << application_handle() - << " not found"); + LOG4CXX_ERROR( + logger_, + "Application port for " << application_handle() << " not found"); *error = new ConnectError(); return false; } diff --git a/src/components/transport_manager/src/tcp/tcp_transport_adapter.cc b/src/components/transport_manager/src/tcp/tcp_transport_adapter.cc index 0e9e63263b..54eb3a7b6d 100644 --- a/src/components/transport_manager/src/tcp/tcp_transport_adapter.cc +++ b/src/components/transport_manager/src/tcp/tcp_transport_adapter.cc @@ -32,19 +32,19 @@ #include "transport_manager/tcp/tcp_transport_adapter.h" +#include <errno.h> #include <memory.h> #include <signal.h> -#include <errno.h> #include <stdio.h> #include <cstdlib> #include <sstream> -#include "utils/logger.h" -#include "utils/threads/thread_delegate.h" #include "transport_manager/tcp/tcp_client_listener.h" #include "transport_manager/tcp/tcp_connection_factory.h" #include "transport_manager/tcp/tcp_device.h" +#include "utils/logger.h" +#include "utils/threads/thread_delegate.h" namespace transport_manager { namespace transport_adapter { @@ -53,16 +53,37 @@ CREATE_LOGGERPTR_GLOBAL(logger_, "TransportManager") TcpTransportAdapter::TcpTransportAdapter( const uint16_t port, - resumption::LastState& last_state, + resumption::LastStateWrapperPtr last_state_wrapper, const TransportManagerSettings& settings) - : TransportAdapterImpl(NULL, - new TcpConnectionFactory(this), - new TcpClientListener(this, port, true), - last_state, - settings) {} + : TransportAdapterImpl( + NULL, + new TcpConnectionFactory(this), + new TcpClientListener( + this, + port, + true, + settings.transport_manager_tcp_adapter_network_interface()), + last_state_wrapper, + settings) {} TcpTransportAdapter::~TcpTransportAdapter() {} +void TcpTransportAdapter::TransportConfigUpdated( + const TransportConfig& new_config) { + LOG4CXX_AUTO_TRACE(logger_); + + transport_config_ = new_config; + + // call the method of parent class to trigger OnTransportConfigUpdated() for + // the listeners + TransportAdapterImpl::TransportConfigUpdated(new_config); +} + +TransportConfig TcpTransportAdapter::GetTransportConfiguration() const { + LOG4CXX_AUTO_TRACE(logger_); + return transport_config_; +} + DeviceType TcpTransportAdapter::GetDeviceType() const { return TCP; } @@ -79,8 +100,8 @@ void TcpTransportAdapter::Store() const { if (!device) { // device could have been disconnected continue; } - utils::SharedPtr<TcpDevice> tcp_device = - DeviceSptr::static_pointer_cast<TcpDevice>(device); + std::shared_ptr<TcpDevice> tcp_device = + std::static_pointer_cast<TcpDevice>(device); Json::Value device_dictionary; device_dictionary["name"] = tcp_device->name(); struct in_addr address; @@ -110,20 +131,22 @@ void TcpTransportAdapter::Store() const { } } tcp_adapter_dictionary["devices"] = devices_dictionary; - Json::Value& dictionary = last_state().get_dictionary(); + resumption::LastStateAccessor accessor = last_state_wrapper_->get_accessor(); + Json::Value dictionary = accessor.GetData().dictionary(); dictionary["TransportManager"]["TcpAdapter"] = tcp_adapter_dictionary; + accessor.GetMutableData().set_dictionary(dictionary); } bool TcpTransportAdapter::Restore() { LOG4CXX_AUTO_TRACE(logger_); bool errors_occurred = false; + resumption::LastStateAccessor accessor = last_state_wrapper_->get_accessor(); + Json::Value dictionary = accessor.GetData().dictionary(); const Json::Value tcp_adapter_dictionary = - last_state().get_dictionary()["TransportManager"]["TcpAdapter"]; + dictionary["TransportManager"]["TcpAdapter"]; const Json::Value devices_dictionary = tcp_adapter_dictionary["devices"]; - for (Json::Value::const_iterator i = devices_dictionary.begin(); - i != devices_dictionary.end(); - ++i) { - const Json::Value device_dictionary = *i; + for (const auto& tcp_adapter_device : devices_dictionary) { + const Json::Value device_dictionary = tcp_adapter_device; std::string name = device_dictionary["name"].asString(); std::string address_record = device_dictionary["address"].asString(); in_addr_t address = inet_addr(address_record.c_str()); @@ -132,10 +155,8 @@ bool TcpTransportAdapter::Restore() { AddDevice(device); const Json::Value applications_dictionary = device_dictionary["applications"]; - for (Json::Value::const_iterator j = applications_dictionary.begin(); - j != applications_dictionary.end(); - ++j) { - const Json::Value application_dictionary = *j; + for (const auto& application : applications_dictionary) { + const Json::Value application_dictionary = application; std::string port_record = application_dictionary["port"].asString(); int port = atoi(port_record.c_str()); ApplicationHandle app_handle = tcp_device->AddDiscoveredApplication(port); diff --git a/src/components/transport_manager/src/transport_adapter/threaded_socket_connection.cc b/src/components/transport_manager/src/transport_adapter/threaded_socket_connection.cc index 9d594affe4..7d96c685f1 100644 --- a/src/components/transport_manager/src/transport_adapter/threaded_socket_connection.cc +++ b/src/components/transport_manager/src/transport_adapter/threaded_socket_connection.cc @@ -30,13 +30,13 @@ * POSSIBILITY OF SUCH DAMAGE. */ -#include <algorithm> #include <errno.h> #include <fcntl.h> #include <memory.h> -#include <unistd.h> -#include <sys/types.h> #include <sys/socket.h> +#include <sys/types.h> +#include <unistd.h> +#include <algorithm> #include "utils/logger.h" #include "utils/threads/thread.h" @@ -62,7 +62,7 @@ ThreadedSocketConnection::ThreadedSocketConnection( , unexpected_disconnect_(false) , device_uid_(device_id) , app_handle_(app_handle) - , thread_(NULL) { + , thread_(nullptr) { const std::string thread_name = std::string("Socket ") + device_handle(); thread_ = threads::CreateThread(thread_name.c_str(), new SocketConnectionDelegate(this)); @@ -70,7 +70,7 @@ ThreadedSocketConnection::ThreadedSocketConnection( ThreadedSocketConnection::~ThreadedSocketConnection() { LOG4CXX_AUTO_TRACE(logger_); - DCHECK(NULL == thread_); + DCHECK(nullptr == thread_); if (-1 != read_fd_) { close(read_fd_); @@ -82,10 +82,12 @@ ThreadedSocketConnection::~ThreadedSocketConnection() { void ThreadedSocketConnection::StopAndJoinThread() { Disconnect(); - thread_->join(); - delete thread_->delegate(); - threads::DeleteThread(thread_); - thread_ = NULL; + if (thread_) { + thread_->join(); + delete thread_->delegate(); + threads::DeleteThread(thread_); + thread_ = nullptr; + } } void ThreadedSocketConnection::Abort() { @@ -167,16 +169,23 @@ TransportAdapter::Error ThreadedSocketConnection::Disconnect() { return Notify(); } +void ThreadedSocketConnection::Terminate() { + LOG4CXX_AUTO_TRACE(logger_); + StopAndJoinThread(); +} + void ThreadedSocketConnection::threadMain() { LOG4CXX_AUTO_TRACE(logger_); - ConnectError* connect_error = NULL; + ConnectError* connect_error = nullptr; if (!Establish(&connect_error)) { LOG4CXX_ERROR(logger_, "Connection Establish failed"); delete connect_error; Abort(); + } else { + LOG4CXX_DEBUG(logger_, "Connection established"); + controller_->ConnectDone(device_handle(), application_handle()); } - LOG4CXX_DEBUG(logger_, "Connection established"); - controller_->ConnectDone(device_handle(), application_handle()); + while (!terminate_flag_) { Transmit(); } @@ -300,11 +309,11 @@ bool ThreadedSocketConnection::Receive() { bytes_read = recv(socket_, buffer, sizeof(buffer), MSG_DONTWAIT); if (bytes_read > 0) { - LOG4CXX_DEBUG(logger_, - "Received " << bytes_read << " bytes for connection " - << this); + LOG4CXX_DEBUG( + logger_, + "Received " << bytes_read << " bytes for connection " << this); ::protocol_handler::RawMessagePtr frame( - new protocol_handler::RawMessage(0, 0, buffer, bytes_read)); + new protocol_handler::RawMessage(0, 0, buffer, bytes_read, false)); controller_->DataReceiveDone( device_handle(), application_handle(), frame); } else if (bytes_read < 0) { diff --git a/src/components/transport_manager/src/transport_adapter/transport_adapter_impl.cc b/src/components/transport_manager/src/transport_adapter/transport_adapter_impl.cc index bdacd68006..36f6dd98d0 100644 --- a/src/components/transport_manager/src/transport_adapter/transport_adapter_impl.cc +++ b/src/components/transport_manager/src/transport_adapter/transport_adapter_impl.cc @@ -31,37 +31,50 @@ */ #include "config_profile/profile.h" -#include "utils/logger.h" #include "utils/helpers.h" +#include "utils/logger.h" +#include "utils/timer_task_impl.h" -#include "transport_manager/transport_adapter/transport_adapter_impl.h" -#include "transport_manager/transport_adapter/transport_adapter_listener.h" +#include "transport_manager/transport_adapter/client_connection_listener.h" #include "transport_manager/transport_adapter/device_scanner.h" #include "transport_manager/transport_adapter/server_connection_factory.h" -#include "transport_manager/transport_adapter/client_connection_listener.h" +#include "transport_manager/transport_adapter/transport_adapter_impl.h" +#include "transport_manager/transport_adapter/transport_adapter_listener.h" +#ifdef WEBSOCKET_SERVER_TRANSPORT_SUPPORT +#include "transport_manager/websocket_server/websocket_device.h" +#endif namespace transport_manager { namespace transport_adapter { +const char* tc_enabled = "enabled"; +const char* tc_tcp_port = "tcp_port"; +const char* tc_tcp_ip_address = "tcp_ip_address"; + CREATE_LOGGERPTR_GLOBAL(logger_, "TransportManager") namespace { -// @deprecated DeviceTypes: PASA_AOA, PASA_BLUETOOTH, MME DeviceTypes devicesType = { std::make_pair(DeviceType::AOA, std::string("USB_AOA")), - std::make_pair(DeviceType::PASA_AOA, std::string("USB_AOA")), std::make_pair(DeviceType::BLUETOOTH, std::string("BLUETOOTH")), - std::make_pair(DeviceType::PASA_BLUETOOTH, std::string("BLUETOOTH")), - std::make_pair(DeviceType::MME, std::string("USB_IOS")), std::make_pair(DeviceType::IOS_BT, std::string("BLUETOOTH_IOS")), std::make_pair(DeviceType::IOS_USB, std::string("USB_IOS")), - std::make_pair(DeviceType::TCP, std::string("WIFI"))}; + std::make_pair(DeviceType::TCP, std::string("WIFI")), + std::make_pair(DeviceType::IOS_USB_HOST_MODE, + std::string("USB_IOS_HOST_MODE")), + std::make_pair(DeviceType::IOS_USB_DEVICE_MODE, + std::string("USB_IOS_DEVICE_MODE")), + std::make_pair(DeviceType::IOS_CARPLAY_WIRELESS, + std::string("CARPLAY_WIRELESS_IOS")), + std::make_pair(DeviceType::CLOUD_WEBSOCKET, std::string("CLOUD_WEBSOCKET")), + std::make_pair(DeviceType::WEBENGINE_WEBSOCKET, + std::string("WEBENGINE_WEBSOCKET"))}; } TransportAdapterImpl::TransportAdapterImpl( DeviceScanner* device_scanner, ServerConnectionFactory* server_connection_factory, ClientConnectionListener* client_connection_listener, - resumption::LastState& last_state, + resumption::LastStateWrapperPtr last_state_wrapper, const TransportManagerSettings& settings) : listeners_() , initialised_(0) @@ -77,7 +90,7 @@ TransportAdapterImpl::TransportAdapterImpl( device_scanner_(device_scanner) , server_connection_factory_(server_connection_factory) , client_connection_listener_(client_connection_listener) - , last_state_(last_state) + , last_state_wrapper_(last_state_wrapper) , settings_(settings) { } @@ -91,16 +104,16 @@ TransportAdapterImpl::~TransportAdapterImpl() { LOG4CXX_DEBUG(logger_, "device_scanner_ deleted."); } if (server_connection_factory_) { - LOG4CXX_DEBUG(logger_, - "Deleting server_connection_factory " - << server_connection_factory_); + LOG4CXX_DEBUG( + logger_, + "Deleting server_connection_factory " << server_connection_factory_); delete server_connection_factory_; LOG4CXX_DEBUG(logger_, "server_connection_factory deleted."); } if (client_connection_listener_) { - LOG4CXX_DEBUG(logger_, - "Deleting client_connection_listener_ " - << client_connection_listener_); + LOG4CXX_DEBUG( + logger_, + "Deleting client_connection_listener_ " << client_connection_listener_); delete client_connection_listener_; LOG4CXX_DEBUG(logger_, "client_connection_listener_ deleted."); } @@ -129,6 +142,12 @@ void TransportAdapterImpl::Terminate() { connections_lock_.AcquireForWriting(); std::swap(connections, connections_); connections_lock_.Release(); + for (const auto& connection : connections) { + auto& info = connection.second; + if (info.connection) { + info.connection->Terminate(); + } + } connections.clear(); LOG4CXX_DEBUG(logger_, "Connections deleted"); @@ -184,9 +203,9 @@ TransportAdapter::Error TransportAdapterImpl::SearchDevices() { TransportAdapter::Error TransportAdapterImpl::Connect( const DeviceUID& device_id, const ApplicationHandle& app_handle) { - LOG4CXX_TRACE(logger_, - "enter. DeviceUID " << device_id << " ApplicationHandle " - << app_handle); + LOG4CXX_TRACE( + logger_, + "enter. DeviceUID " << device_id << " ApplicationHandle " << app_handle); if (server_connection_factory_ == 0) { LOG4CXX_TRACE(logger_, "exit with NOT_SUPPORTED"); return NOT_SUPPORTED; @@ -201,17 +220,21 @@ TransportAdapter::Error TransportAdapterImpl::Connect( } connections_lock_.AcquireForWriting(); + + std::pair<DeviceUID, ApplicationHandle> connection_key = + std::make_pair(device_id, app_handle); const bool already_exists = - connections_.end() != - connections_.find(std::make_pair(device_id, app_handle)); + connections_.end() != connections_.find(connection_key); + ConnectionInfo& info = connections_[connection_key]; if (!already_exists) { - ConnectionInfo& info = connections_[std::make_pair(device_id, app_handle)]; info.app_handle = app_handle; info.device_id = device_id; info.state = ConnectionInfo::NEW; } + const bool pending_app = ConnectionInfo::PENDING == info.state; connections_lock_.Release(); - if (already_exists) { + + if (already_exists && !pending_app) { LOG4CXX_TRACE(logger_, "exit with ALREADY_EXISTS"); return ALREADY_EXISTS; } @@ -219,9 +242,9 @@ TransportAdapter::Error TransportAdapterImpl::Connect( const TransportAdapter::Error err = server_connection_factory_->CreateConnection(device_id, app_handle); if (TransportAdapter::OK != err) { - connections_lock_.AcquireForWriting(); - connections_.erase(std::make_pair(device_id, app_handle)); - connections_lock_.Release(); + if (!pending_app) { + RemoveConnection(device_id, app_handle); + } } LOG4CXX_TRACE(logger_, "exit with error: " << err); return err; @@ -233,6 +256,33 @@ TransportAdapter::Error TransportAdapterImpl::ConnectDevice( DeviceSptr device = FindDevice(device_handle); if (device) { TransportAdapter::Error err = ConnectDevice(device); + if (FAIL == err && GetDeviceType() == DeviceType::CLOUD_WEBSOCKET) { + LOG4CXX_TRACE(logger_, + "Error occurred while connecting cloud app: " << err); + // Update retry count + if (device->retry_count() >= + get_settings().cloud_app_max_retry_attempts()) { + device->reset_retry_count(); + ConnectionStatusUpdated(device, ConnectionStatus::PENDING); + return err; + } else if (device->connection_status() == ConnectionStatus::PENDING) { + ConnectionStatusUpdated(device, ConnectionStatus::RETRY); + } + + device->next_retry(); + + // Start timer for next retry + TimerSPtr retry_timer(std::make_shared<timer::Timer>( + "RetryConnectionTimer", + new timer::TimerTaskImpl<TransportAdapterImpl>( + this, &TransportAdapterImpl::RetryConnection))); + sync_primitives::AutoLock locker(retry_timer_pool_lock_); + retry_timer_pool_.push(std::make_pair(retry_timer, device_handle)); + retry_timer->Start(get_settings().cloud_app_retry_timeout(), + timer::kSingleShot); + } else if (OK == err) { + ConnectionStatusUpdated(device, ConnectionStatus::CONNECTED); + } LOG4CXX_TRACE(logger_, "exit with error: " << err); return err; } else { @@ -241,11 +291,65 @@ TransportAdapter::Error TransportAdapterImpl::ConnectDevice( } } +void TransportAdapterImpl::RetryConnection() { + ClearCompletedTimers(); + const DeviceUID device_id = GetNextRetryDevice(); + if (device_id.empty()) { + LOG4CXX_ERROR(logger_, + "Unable to find timer, ignoring RetryConnection request"); + return; + } + ConnectDevice(device_id); +} + +void TransportAdapterImpl::ClearCompletedTimers() { + // Cleanup any retry timers which have completed execution + sync_primitives::AutoLock locker(completed_timer_pool_lock_); + while (!completed_timer_pool_.empty()) { + auto timer_entry = completed_timer_pool_.front(); + if (timer_entry.first->is_completed()) { + completed_timer_pool_.pop(); + } + } +} + +DeviceUID TransportAdapterImpl::GetNextRetryDevice() { + sync_primitives::AutoLock retry_locker(retry_timer_pool_lock_); + if (retry_timer_pool_.empty()) { + return std::string(); + } + auto timer_entry = retry_timer_pool_.front(); + retry_timer_pool_.pop(); + + // Store reference for cleanup later + sync_primitives::AutoLock completed_locker(completed_timer_pool_lock_); + completed_timer_pool_.push(timer_entry); + + return timer_entry.second; +} + +ConnectionStatus TransportAdapterImpl::GetConnectionStatus( + const DeviceUID& device_handle) const { + DeviceSptr device = FindDevice(device_handle); + return device.use_count() == 0 ? ConnectionStatus::INVALID + : device->connection_status(); +} + +void TransportAdapterImpl::ConnectionStatusUpdated(DeviceSptr device, + ConnectionStatus status) { + device->set_connection_status(status); + for (TransportAdapterListenerList::iterator it = listeners_.begin(); + it != listeners_.end(); + ++it) { + (*it)->OnConnectionStatusUpdated(this); + } +} + TransportAdapter::Error TransportAdapterImpl::Disconnect( const DeviceUID& device_id, const ApplicationHandle& app_handle) { - LOG4CXX_TRACE(logger_, - "enter. device_id: " << &device_id - << ", device_id: " << &device_id); + LOG4CXX_TRACE( + logger_, + "enter. device_id: " << &device_id << ", device_id: " << &device_id); if (!initialised_) { LOG4CXX_TRACE(logger_, "exit with BAD_STATE"); return BAD_STATE; @@ -270,6 +374,12 @@ TransportAdapter::Error TransportAdapterImpl::DisconnectDevice( } Error error = OK; + DeviceSptr device = FindDevice(device_id); + if (!device) { + LOG4CXX_WARN(logger_, "Device with id: " << device_id << " Not found"); + return BAD_PARAM; + } + ConnectionStatusUpdated(device, ConnectionStatus::CLOSING); std::vector<ConnectionInfo> to_disconnect; connections_lock_.AcquireForReading(); @@ -320,8 +430,9 @@ TransportAdapter::Error TransportAdapterImpl::SendData( } } -TransportAdapter::Error TransportAdapterImpl::StartClientListening() { - LOG4CXX_TRACE(logger_, "enter"); +TransportAdapter::Error TransportAdapterImpl::ChangeClientListening( + TransportAction required_change) { + LOG4CXX_AUTO_TRACE(logger_); if (client_connection_listener_ == 0) { LOG4CXX_TRACE(logger_, "exit with NOT_SUPPORTED"); return NOT_SUPPORTED; @@ -330,27 +441,43 @@ TransportAdapter::Error TransportAdapterImpl::StartClientListening() { LOG4CXX_TRACE(logger_, "exit with BAD_STATE"); return BAD_STATE; } - TransportAdapter::Error err = client_connection_listener_->StartListening(); - LOG4CXX_TRACE(logger_, "exit with error: " << err); - return err; -} -TransportAdapter::Error TransportAdapterImpl::StopClientListening() { - LOG4CXX_TRACE(logger_, "enter"); - if (client_connection_listener_ == 0) { - LOG4CXX_TRACE(logger_, "exit with NOT_SUPPORTED"); - return NOT_SUPPORTED; - } - if (!client_connection_listener_->IsInitialised()) { - LOG4CXX_TRACE(logger_, "exit with BAD_STATE"); - return BAD_STATE; - } - TransportAdapter::Error err = client_connection_listener_->StopListening(); - sync_primitives::AutoLock locker(devices_mutex_); - for (DeviceMap::iterator it = devices_.begin(); it != devices_.end(); ++it) { - it->second->Stop(); + TransportAdapter::Error err = TransportAdapter::Error::UNKNOWN; + + switch (required_change) { + case transport_manager::TransportAction::kVisibilityOn: + err = client_connection_listener_->StartListening(); + break; + + case transport_manager::TransportAction::kListeningOn: + err = client_connection_listener_->ResumeListening(); + break; + + case transport_manager::TransportAction::kListeningOff: + err = client_connection_listener_->SuspendListening(); + { + sync_primitives::AutoLock locker(devices_mutex_); + for (DeviceMap::iterator it = devices_.begin(); it != devices_.end(); + ++it) { + it->second->Stop(); + } + } + break; + + case transport_manager::TransportAction::kVisibilityOff: + err = client_connection_listener_->StopListening(); + { + sync_primitives::AutoLock locker(devices_mutex_); + for (DeviceMap::iterator it = devices_.begin(); it != devices_.end(); + ++it) { + it->second->Stop(); + } + } + break; + default: + NOTREACHED(); } - LOG4CXX_TRACE(logger_, "exit with error: " << err); + LOG4CXX_TRACE(logger_, "Exit with error: " << err); return err; } @@ -367,6 +494,32 @@ DeviceList TransportAdapterImpl::GetDeviceList() const { return devices; } +DeviceSptr TransportAdapterImpl::GetWebEngineDevice() const { +#ifndef WEBSOCKET_SERVER_TRANSPORT_SUPPORT + LOG4CXX_TRACE(logger_, + "Web engine support is disabled. Device does not exist"); + return DeviceSptr(); +#else + LOG4CXX_AUTO_TRACE(logger_); + sync_primitives::AutoLock locker(devices_mutex_); + + auto web_engine_device = + std::find_if(devices_.begin(), + devices_.end(), + [](const std::pair<DeviceUID, DeviceSptr> device) { + return webengine_constants::kWebEngineDeviceName == + device.second->name(); + }); + + if (devices_.end() != web_engine_device) { + return web_engine_device->second; + } + + LOG4CXX_ERROR(logger_, "WebEngine device not found!"); + return std::make_shared<transport_adapter::WebSocketDevice>("", ""); +#endif +} + DeviceSptr TransportAdapterImpl::AddDevice(DeviceSptr device) { LOG4CXX_TRACE(logger_, "enter. device: " << device); DeviceSptr existing_device; @@ -389,6 +542,7 @@ DeviceSptr TransportAdapterImpl::AddDevice(DeviceSptr device) { LOG4CXX_TRACE(logger_, "exit with TRUE. Condition: same_device_found"); return existing_device; } else { + device->set_connection_status(ConnectionStatus::PENDING); for (TransportAdapterListenerList::iterator it = listeners_.begin(); it != listeners_.end(); ++it) { @@ -539,9 +693,9 @@ void TransportAdapterImpl::ConnectionCreated( void TransportAdapterImpl::DeviceDisconnected( const DeviceUID& device_handle, const DisconnectDeviceError& error) { const DeviceUID device_uid = device_handle; - LOG4CXX_TRACE(logger_, - "enter. device_handle: " << &device_uid - << ", error: " << &error); + LOG4CXX_TRACE( + logger_, + "enter. device_handle: " << &device_uid << ", error: " << &error); ApplicationList app_list = GetApplicationList(device_uid); for (ApplicationList::const_iterator i = app_list.begin(); i != app_list.end(); @@ -563,14 +717,12 @@ void TransportAdapterImpl::DeviceDisconnected( listener->OnDisconnectDeviceDone(this, device_uid); } - connections_lock_.AcquireForWriting(); for (ApplicationList::const_iterator i = app_list.begin(); i != app_list.end(); ++i) { ApplicationHandle app_handle = *i; - connections_.erase(std::make_pair(device_uid, app_handle)); + RemoveConnection(device_uid, app_handle); } - connections_lock_.Release(); RemoveDevice(device_uid); LOG4CXX_TRACE(logger_, "exit"); @@ -599,9 +751,9 @@ void TransportAdapterImpl::DisconnectDone(const DeviceUID& device_handle, const ApplicationHandle& app_handle) { const DeviceUID device_uid = device_handle; const ApplicationHandle app_uid = app_handle; - LOG4CXX_TRACE(logger_, - "enter. device_id: " << &device_uid - << ", app_handle: " << &app_uid); + LOG4CXX_TRACE( + logger_, + "enter. device_id: " << &device_uid << ", app_handle: " << &app_uid); DeviceSptr device = FindDevice(device_handle); if (!device) { LOG4CXX_WARN(logger_, "Device: uid " << &device_uid << " not found"); @@ -620,9 +772,7 @@ void TransportAdapterImpl::DisconnectDone(const DeviceUID& device_handle, listener->OnDisconnectDeviceDone(this, device_uid); } } - connections_lock_.AcquireForWriting(); - connections_.erase(std::make_pair(device_uid, app_uid)); - connections_lock_.Release(); + RemoveConnection(device_uid, app_uid); if (device_disconnected) { RemoveDevice(device_uid); @@ -695,6 +845,16 @@ void TransportAdapterImpl::DataSendFailed( LOG4CXX_TRACE(logger_, "exit"); } +void TransportAdapterImpl::TransportConfigUpdated( + const TransportConfig& new_config) { + LOG4CXX_AUTO_TRACE(logger_); + for (TransportAdapterListenerList::iterator it = listeners_.begin(); + it != listeners_.end(); + ++it) { + (*it)->OnTransportConfigUpdated(this); + } +} + void TransportAdapterImpl::DoTransportSwitch() const { LOG4CXX_AUTO_TRACE(logger_); std::for_each( @@ -712,6 +872,26 @@ void TransportAdapterImpl::DeviceSwitched(const DeviceUID& device_handle) { UNUSED(device_handle); } +ConnectionSPtr TransportAdapterImpl::FindPendingConnection( + const DeviceUID& device_id, const ApplicationHandle& app_handle) const { + LOG4CXX_TRACE( + logger_, + "enter. device_id: " << &device_id << ", app_handle: " << &app_handle); + ConnectionSPtr connection; + connections_lock_.AcquireForReading(); + ConnectionMap::const_iterator it = + connections_.find(std::make_pair(device_id, app_handle)); + if (it != connections_.end()) { + const ConnectionInfo& info = it->second; + if (info.state == ConnectionInfo::PENDING) { + connection = info.connection; + } + } + connections_lock_.Release(); + LOG4CXX_TRACE(logger_, "exit with Connection: " << connection); + return connection; +} + DeviceSptr TransportAdapterImpl::FindDevice(const DeviceUID& device_id) const { LOG4CXX_TRACE(logger_, "enter. device_id: " << &device_id); DeviceSptr ret; @@ -727,11 +907,39 @@ DeviceSptr TransportAdapterImpl::FindDevice(const DeviceUID& device_id) const { return ret; } +void TransportAdapterImpl::ConnectPending(const DeviceUID& device_id, + const ApplicationHandle& app_handle) { + LOG4CXX_AUTO_TRACE(logger_); + connections_lock_.AcquireForWriting(); + ConnectionMap::iterator it_conn = + connections_.find(std::make_pair(device_id, app_handle)); + if (it_conn != connections_.end()) { + ConnectionInfo& info = it_conn->second; + info.state = ConnectionInfo::PENDING; + } + connections_lock_.Release(); + + DeviceSptr device = FindDevice(device_id); + if (device.use_count() == 0) { + LOG4CXX_ERROR( + logger_, "Unable to find device, cannot set connection pending status"); + return; + } else { + device->set_connection_status(ConnectionStatus::PENDING); + } + + for (TransportAdapterListenerList::iterator it = listeners_.begin(); + it != listeners_.end(); + ++it) { + (*it)->OnConnectPending(this, device_id, app_handle); + } +} + void TransportAdapterImpl::ConnectDone(const DeviceUID& device_id, const ApplicationHandle& app_handle) { - LOG4CXX_TRACE(logger_, - "enter. device_id: " << &device_id - << ", app_handle: " << &app_handle); + LOG4CXX_TRACE( + logger_, + "enter. device_id: " << &device_id << ", app_handle: " << &app_handle); connections_lock_.AcquireForReading(); ConnectionMap::iterator it_conn = connections_.find(std::make_pair(device_id, app_handle)); @@ -759,9 +967,7 @@ void TransportAdapterImpl::ConnectFailed(const DeviceUID& device_handle, LOG4CXX_TRACE(logger_, "enter. device_id: " << &device_uid << ", app_handle: " << &app_uid << ", error: " << &error); - connections_lock_.AcquireForWriting(); - connections_.erase(std::make_pair(device_uid, app_uid)); - connections_lock_.Release(); + RemoveConnection(device_uid, app_uid); for (TransportAdapterListenerList::iterator it = listeners_.begin(); it != listeners_.end(); ++it) { @@ -774,23 +980,70 @@ void TransportAdapterImpl::RemoveFinalizedConnection( const DeviceUID& device_handle, const ApplicationHandle& app_handle) { const DeviceUID device_uid = device_handle; LOG4CXX_AUTO_TRACE(logger_); - sync_primitives::AutoWriteLock lock(connections_lock_); - ConnectionMap::iterator it_conn = - connections_.find(std::make_pair(device_uid, app_handle)); - if (it_conn == connections_.end()) { - LOG4CXX_WARN(logger_, - "Device_id: " << &device_uid << ", app_handle: " << &app_handle - << " connection not found"); - return; + { + connections_lock_.AcquireForWriting(); + auto it_conn = connections_.find(std::make_pair(device_uid, app_handle)); + if (connections_.end() == it_conn) { + LOG4CXX_WARN(logger_, + "Device_id: " << &device_uid << ", app_handle: " + << &app_handle << " connection not found"); + connections_lock_.Release(); + return; + } + const ConnectionInfo& info = it_conn->second; + if (ConnectionInfo::FINALISING != info.state) { + LOG4CXX_WARN(logger_, + "Device_id: " << &device_uid << ", app_handle: " + << &app_handle << " connection not finalized"); + connections_lock_.Release(); + return; + } + // By copying the info.connection shared pointer into this local variable, + // we can delay the connection's destructor until after + // connections_lock_.Release. + LOG4CXX_DEBUG( + logger_, + "RemoveFinalizedConnection copying connection with Device_id: " + << &device_uid << ", app_handle: " << &app_handle); + ConnectionSPtr connection = info.connection; + connections_.erase(it_conn); + connections_lock_.Release(); + LOG4CXX_DEBUG(logger_, + "RemoveFinalizedConnection Connections Lock Released"); } - const ConnectionInfo& info = it_conn->second; - if (info.state != ConnectionInfo::FINALISING) { - LOG4CXX_WARN(logger_, - "Device_id: " << &device_uid << ", app_handle: " << &app_handle - << " connection not finalized"); + + DeviceSptr device = FindDevice(device_handle); + if (!device) { + LOG4CXX_WARN(logger_, "Device: uid " << &device_uid << " not found"); return; } - connections_.erase(it_conn); + + if (ToBeAutoDisconnected(device) && + IsSingleApplication(device_handle, app_handle)) { + RemoveDevice(device_uid); + } +} + +void TransportAdapterImpl::RemoveConnection( + const DeviceUID& device_id, const ApplicationHandle& app_handle) { + ConnectionSPtr connection; + connections_lock_.AcquireForWriting(); + ConnectionMap::const_iterator it = + connections_.find(std::make_pair(device_id, app_handle)); + if (it != connections_.end()) { + // By copying the connection from the map to this shared pointer, + // we can erase the object from the map without triggering the destructor + LOG4CXX_DEBUG(logger_, + "Copying connection with Device_id: " + << &device_id << ", app_handle: " << &app_handle); + connection = it->second.connection; + connections_.erase(it); + } + connections_lock_.Release(); + LOG4CXX_DEBUG(logger_, "Connections Lock Released"); + + // And now, "connection" goes out of scope, triggering the destructor outside + // of the "connections_lock_" } void TransportAdapterImpl::AddListener(TransportAdapterListener* listener) { @@ -803,24 +1056,24 @@ ApplicationList TransportAdapterImpl::GetApplicationList( const DeviceUID& device_id) const { LOG4CXX_TRACE(logger_, "enter. device_id: " << &device_id); DeviceSptr device = FindDevice(device_id); - if (device.valid()) { + if (device.use_count() != 0) { ApplicationList lst = device->GetApplicationList(); LOG4CXX_TRACE(logger_, "exit with ApplicationList. It's size = " - << lst.size() << " Condition: device.valid()"); + << lst.size() << " Condition: device.use_count() != 0"); return lst; } - LOG4CXX_TRACE( - logger_, - "exit with empty ApplicationList. Condition: NOT device.valid()"); + LOG4CXX_TRACE(logger_, + "exit with empty ApplicationList. Condition: NOT " + "device.use_count() != 0"); return ApplicationList(); } void TransportAdapterImpl::ConnectionFinished( const DeviceUID& device_id, const ApplicationHandle& app_handle) { - LOG4CXX_TRACE(logger_, - "enter. device_id: " << &device_id - << ", app_handle: " << &app_handle); + LOG4CXX_TRACE( + logger_, + "enter. device_id: " << &device_id << ", app_handle: " << &app_handle); connections_lock_.AcquireForReading(); ConnectionMap::iterator it = connections_.find(std::make_pair(device_id, app_handle)); @@ -875,7 +1128,7 @@ bool TransportAdapterImpl::IsInitialised() const { std::string TransportAdapterImpl::DeviceName(const DeviceUID& device_id) const { DeviceSptr device = FindDevice(device_id); - if (device.valid()) { + if (device.use_count() != 0) { return device->name(); } else { return ""; @@ -944,9 +1197,9 @@ bool TransportAdapterImpl::ToBeAutoDisconnected(DeviceSptr device) const { ConnectionSPtr TransportAdapterImpl::FindEstablishedConnection( const DeviceUID& device_id, const ApplicationHandle& app_handle) const { - LOG4CXX_TRACE(logger_, - "enter. device_id: " << &device_id - << ", app_handle: " << &app_handle); + LOG4CXX_TRACE( + logger_, + "enter. device_id: " << &device_id << ", app_handle: " << &app_handle); ConnectionSPtr connection; connections_lock_.AcquireForReading(); ConnectionMap::const_iterator it = @@ -1020,12 +1273,13 @@ void TransportAdapterImpl::RunAppOnDevice(const DeviceUID& device_uid, void TransportAdapterImpl::RemoveDevice(const DeviceUID& device_handle) { LOG4CXX_AUTO_TRACE(logger_); - LOG4CXX_DEBUG(logger_, "Device_handle: " << &device_handle); + LOG4CXX_DEBUG(logger_, "Remove Device_handle: " << &device_handle); sync_primitives::AutoLock locker(devices_mutex_); DeviceMap::iterator i = devices_.find(device_handle); if (i != devices_.end()) { DeviceSptr device = i->second; - if (!device->keep_on_disconnect()) { + bool is_cloud_device = (GetDeviceType() == DeviceType::CLOUD_WEBSOCKET); + if (!device->keep_on_disconnect() || is_cloud_device) { devices_.erase(i); for (TransportAdapterListenerList::iterator it = listeners_.begin(); it != listeners_.end(); diff --git a/src/components/transport_manager/src/transport_adapter/transport_adapter_listener_impl.cc b/src/components/transport_manager/src/transport_adapter/transport_adapter_listener_impl.cc index f1181ce921..bf2d3dbdf4 100644 --- a/src/components/transport_manager/src/transport_adapter/transport_adapter_listener_impl.cc +++ b/src/components/transport_manager/src/transport_adapter/transport_adapter_listener_impl.cc @@ -34,9 +34,9 @@ #include "utils/logger.h" +#include "transport_manager/transport_adapter/transport_adapter_event.h" #include "transport_manager/transport_adapter/transport_adapter_listener_impl.h" #include "transport_manager/transport_manager_impl.h" -#include "transport_manager/transport_adapter/transport_adapter_event.h" namespace transport_manager { CREATE_LOGGERPTR_GLOBAL(logger_, "TransportManager") @@ -116,6 +116,44 @@ void TransportAdapterListenerImpl::OnFindNewApplicationsRequest( LOG4CXX_TRACE(logger_, "exit"); } +void TransportAdapterListenerImpl::OnConnectionStatusUpdated( + const TransportAdapter* adapter) { + LOG4CXX_TRACE(logger_, "enter. adapter* " << adapter); + const TransportAdapterEvent event(EventTypeEnum::ON_CONNECTION_STATUS_UPDATED, + transport_adapter_, + "", + 0, + ::protocol_handler::RawMessagePtr(), + BaseErrorPtr(new BaseError())); + if (transport_manager_ != NULL && + transport_manager::E_SUCCESS != + transport_manager_->ReceiveEventFromDevice(event)) { + LOG4CXX_WARN(logger_, "Failed to receive event from device"); + } + LOG4CXX_TRACE(logger_, "exit"); +} + +void TransportAdapterListenerImpl::OnConnectPending( + const TransportAdapter* adapter, + const DeviceUID& device, + const ApplicationHandle& application_id) { + LOG4CXX_TRACE(logger_, + "enter adapter*: " << adapter << ", device: " << &device + << ", application_id: " << &application_id); + const TransportAdapterEvent event(EventTypeEnum::ON_CONNECT_PENDING, + transport_adapter_, + device, + application_id, + ::protocol_handler::RawMessagePtr(), + BaseErrorPtr(new BaseError())); + if (transport_manager_ != NULL && + transport_manager::E_SUCCESS != + transport_manager_->ReceiveEventFromDevice(event)) { + LOG4CXX_WARN(logger_, "Failed to receive event from device"); + } + LOG4CXX_TRACE(logger_, "exit"); +} + void TransportAdapterListenerImpl::OnConnectDone( const TransportAdapter* adapter, const DeviceUID& device, @@ -275,7 +313,7 @@ void TransportAdapterListenerImpl::OnDataSendDone( device, app_id, data_container, - new BaseError()); + std::make_shared<BaseError>()); if (transport_manager_ != NULL && transport_manager::E_SUCCESS != transport_manager_->ReceiveEventFromDevice(event)) { @@ -376,4 +414,23 @@ void TransportAdapterListenerImpl::OnTransportSwitchRequested( LOG4CXX_WARN(logger_, "Failed to receive event from device"); } } + +void TransportAdapterListenerImpl::OnTransportConfigUpdated( + const transport_adapter::TransportAdapter* adapter) { + LOG4CXX_AUTO_TRACE(logger_); + + const TransportAdapterEvent event(EventTypeEnum::ON_TRANSPORT_CONFIG_UPDATED, + transport_adapter_, + "", + 0, + ::protocol_handler::RawMessagePtr(), + BaseErrorPtr()); + + if (transport_manager_ != NULL && + transport_manager::E_SUCCESS != + transport_manager_->ReceiveEventFromDevice(event)) { + LOG4CXX_WARN(logger_, "Failed to receive event from device"); + } +} + } // namespace transport_manager diff --git a/src/components/transport_manager/src/transport_manager_default.cc b/src/components/transport_manager/src/transport_manager_default.cc index 196ad09af4..74c5b9ceee 100644 --- a/src/components/transport_manager/src/transport_manager_default.cc +++ b/src/components/transport_manager/src/transport_manager_default.cc @@ -44,84 +44,156 @@ #include "transport_manager/usb/usb_aoa_adapter.h" #endif // USB_SUPPORT -#if defined(BUILD_TESTS) +#if defined(CLOUD_APP_WEBSOCKET_TRANSPORT_SUPPORT) +#include "transport_manager/cloud/cloud_websocket_transport_adapter.h" +#endif // CLOUD_APP_WEBSOCKET_TRANSPORT_SUPPORT + +#ifdef WEBSOCKET_SERVER_TRANSPORT_SUPPORT +#include "transport_manager/websocket_server/websocket_server_transport_adapter.h" +#endif + +#if defined(ENABLE_IAP2EMULATION) #include "transport_manager/iap2_emulation/iap2_transport_adapter.h" -#endif // BUILD_TEST +#endif // ENABLE_IAP2EMULATION namespace transport_manager { CREATE_LOGGERPTR_GLOBAL(logger_, "TransportManager") +TransportAdapterFactory::TransportAdapterFactory() { +#ifdef BLUETOOTH_SUPPORT + ta_bluetooth_creator_ = [](resumption::LastStateWrapperPtr last_state_wrapper, + const TransportManagerSettings& settings) { + return new transport_adapter::BluetoothTransportAdapter(last_state_wrapper, + settings); + }; +#endif + ta_tcp_creator_ = [](const uint16_t port, + resumption::LastStateWrapperPtr last_state_wrapper, + const TransportManagerSettings& settings) { + return new transport_adapter::TcpTransportAdapter( + port, last_state_wrapper, settings); + }; +#if defined(USB_SUPPORT) + ta_usb_creator_ = [](resumption::LastStateWrapperPtr last_state_wrapper, + const TransportManagerSettings& settings) { + return new transport_adapter::UsbAoaAdapter(last_state_wrapper, settings); + }; +#endif +#if defined(CLOUD_APP_WEBSOCKET_TRANSPORT_SUPPORT) + ta_cloud_creator_ = [](resumption::LastStateWrapperPtr last_state_wrapper, + const TransportManagerSettings& settings) { + return new transport_adapter::CloudWebsocketTransportAdapter( + last_state_wrapper, settings); + }; +#endif + +#if defined(WEBSOCKET_SERVER_TRANSPORT_SUPPORT) + ta_websocket_server_creator_ = + [](resumption::LastStateWrapperPtr last_state_wrapper, + const TransportManagerSettings& settings) { + return new transport_adapter::WebSocketServerTransportAdapter( + last_state_wrapper, settings); + }; +#endif +} + TransportManagerDefault::TransportManagerDefault( - const TransportManagerSettings& settings) - : TransportManagerImpl(settings) {} + const TransportManagerSettings& settings, + const TransportAdapterFactory& ta_factory) + : TransportManagerImpl(settings), ta_factory_(ta_factory) {} -int TransportManagerDefault::Init(resumption::LastState& last_state) { +int TransportManagerDefault::Init( + resumption::LastStateWrapperPtr last_state_wrapper) { LOG4CXX_TRACE(logger_, "enter"); - if (E_SUCCESS != TransportManagerImpl::Init(last_state)) { + if (E_SUCCESS != TransportManagerImpl::Init(last_state_wrapper)) { LOG4CXX_TRACE(logger_, "exit with E_TM_IS_NOT_INITIALIZED. Condition: E_SUCCESS != " "TransportManagerImpl::Init()"); return E_TM_IS_NOT_INITIALIZED; } -#ifdef BLUETOOTH_SUPPORT - transport_adapter::TransportAdapterImpl* ta_bluetooth = - new transport_adapter::BluetoothTransportAdapter(last_state, - get_settings()); + const auto& settings = get_settings(); + +#if defined(BLUETOOTH_SUPPORT) + auto ta_bluetooth = + ta_factory_.ta_bluetooth_creator_(last_state_wrapper, settings); #ifdef TELEMETRY_MONITOR if (metric_observer_) { ta_bluetooth->SetTelemetryObserver(metric_observer_); } #endif // TELEMETRY_MONITOR AddTransportAdapter(ta_bluetooth); - ta_bluetooth = NULL; -#endif +#endif // BLUETOOTH_SUPPORT - const uint16_t port = get_settings().transport_manager_tcp_adapter_port(); - transport_adapter::TransportAdapterImpl* ta_tcp = - new transport_adapter::TcpTransportAdapter( - port, last_state, get_settings()); + auto ta_tcp = + ta_factory_.ta_tcp_creator_(settings.transport_manager_tcp_adapter_port(), + last_state_wrapper, + settings); #ifdef TELEMETRY_MONITOR if (metric_observer_) { ta_tcp->SetTelemetryObserver(metric_observer_); } #endif // TELEMETRY_MONITOR AddTransportAdapter(ta_tcp); - ta_tcp = NULL; #if defined(USB_SUPPORT) - transport_adapter::TransportAdapterImpl* ta_usb = - new transport_adapter::UsbAoaAdapter(last_state, get_settings()); + auto ta_usb = ta_factory_.ta_usb_creator_(last_state_wrapper, settings); #ifdef TELEMETRY_MONITOR if (metric_observer_) { ta_usb->SetTelemetryObserver(metric_observer_); } #endif // TELEMETRY_MONITOR AddTransportAdapter(ta_usb); - ta_usb = NULL; #endif // USB_SUPPORT -#if defined BUILD_TESTS +#if defined(CLOUD_APP_WEBSOCKET_TRANSPORT_SUPPORT) + auto ta_cloud = ta_factory_.ta_cloud_creator_(last_state_wrapper, settings); +#ifdef TELEMETRY_MONITOR + if (metric_observer_) { + ta_cloud->SetTelemetryObserver(metric_observer_); + } +#endif // TELEMETRY_MONITOR + AddTransportAdapter(ta_cloud); +#endif // CLOUD_APP_WEBSOCKET_TRANSPORT_SUPPORT + +#ifdef WEBSOCKET_SERVER_TRANSPORT_SUPPORT + auto ta_websocket = + ta_factory_.ta_websocket_server_creator_(last_state_wrapper, settings); + +#ifdef TELEMETRY_MONITOR + if (metric_observer_) { + ta_websocket->SetTelemetryObserver(metric_observer_); + } +#endif // TELEMETRY_MONITOR + AddTransportAdapter(ta_websocket); + ta_websocket = NULL; +#endif // WEBSOCKET_SERVER_TRANSPORT_SUPPORT + +#if defined ENABLE_IAP2EMULATION const uint16_t iap2_bt_emu_port = 23456; transport_adapter::IAP2BluetoothEmulationTransportAdapter* iap2_bt_emu_adapter = new transport_adapter::IAP2BluetoothEmulationTransportAdapter( - iap2_bt_emu_port, last_state, get_settings()); + iap2_bt_emu_port, last_state_wrapper, settings); AddTransportAdapter(iap2_bt_emu_adapter); const uint16_t iap2_usb_emu_port = 34567; transport_adapter::IAP2USBEmulationTransportAdapter* iap2_usb_emu_adapter = new transport_adapter::IAP2USBEmulationTransportAdapter( - iap2_usb_emu_port, last_state, get_settings()); + iap2_usb_emu_port, last_state_wrapper, settings); AddTransportAdapter(iap2_usb_emu_adapter); -#endif // BUILD_TEST +#endif // ENABLE_IAP2EMULATION LOG4CXX_TRACE(logger_, "exit with E_SUCCESS"); return E_SUCCESS; } +int TransportManagerDefault::Init(resumption::LastState&) { + return 1; +} + TransportManagerDefault::~TransportManagerDefault() {} } // namespace transport_manager diff --git a/src/components/transport_manager/src/transport_manager_impl.cc b/src/components/transport_manager/src/transport_manager_impl.cc index a364220a64..ddff4f3780 100644 --- a/src/components/transport_manager/src/transport_manager_impl.cc +++ b/src/components/transport_manager/src/transport_manager_impl.cc @@ -33,25 +33,32 @@ #include "transport_manager/transport_manager_impl.h" #include <stdint.h> +#include <algorithm> #include <cstring> +#include <functional> +#include <iostream> +#include <limits> #include <queue> #include <set> -#include <algorithm> -#include <limits> -#include <functional> #include <sstream> -#include <iostream> -#include "utils/macro.h" #include "utils/logger.h" -#include "utils/make_shared.h" -#include "utils/timer_task_impl.h" +#include "utils/macro.h" + +#include "config_profile/profile.h" +#if defined(CLOUD_APP_WEBSOCKET_TRANSPORT_SUPPORT) +#include "transport_manager/cloud/cloud_websocket_transport_adapter.h" +#endif #include "transport_manager/common.h" -#include "transport_manager/transport_manager_listener.h" -#include "transport_manager/transport_manager_listener_empty.h" #include "transport_manager/transport_adapter/transport_adapter.h" #include "transport_manager/transport_adapter/transport_adapter_event.h" -#include "config_profile/profile.h" +#include "transport_manager/transport_manager_listener.h" +#include "transport_manager/transport_manager_listener_empty.h" +#ifdef WEBSOCKET_SERVER_TRANSPORT_SUPPORT +#include "transport_manager/websocket_server/websocket_device.h" +#include "transport_manager/websocket_server/websocket_server_transport_adapter.h" +#endif +#include "utils/timer_task_impl.h" using ::transport_manager::transport_adapter::TransportAdapter; @@ -64,6 +71,7 @@ struct ConnectionFinder { return id_ == connection.id; } }; + } // namespace namespace transport_manager { @@ -97,7 +105,14 @@ TransportManagerImpl::TransportManagerImpl( , device_switch_timer_( "Device reconection timer", new timer::TimerTaskImpl<TransportManagerImpl>( - this, &TransportManagerImpl::ReconnectionTimeout)) { + this, &TransportManagerImpl::ReconnectionTimeout)) + , events_processing_is_active_(true) + , events_processing_lock_() + , events_processing_cond_var_() + , web_engine_device_info_(0, + "", + webengine_constants::kWebEngineDeviceName, + webengine_constants::kWebEngineConnectionType) { LOG4CXX_TRACE(logger_, "TransportManager has created"); } @@ -129,6 +144,48 @@ void TransportManagerImpl::ReconnectionTimeout() { device_to_reconnect_); } +void TransportManagerImpl::AddCloudDevice( + const transport_manager::transport_adapter::CloudAppProperties& + cloud_properties) { +#if !defined(CLOUD_APP_WEBSOCKET_TRANSPORT_SUPPORT) + LOG4CXX_TRACE(logger_, "Cloud app support is disabled. Exiting function"); +#else + transport_adapter::DeviceType type = transport_adapter::DeviceType::UNKNOWN; + if (cloud_properties.cloud_transport_type == "WS") { + type = transport_adapter::DeviceType::CLOUD_WEBSOCKET; + } +#ifdef ENABLE_SECURITY + else if (cloud_properties.cloud_transport_type == "WSS") { + type = transport_adapter::DeviceType::CLOUD_WEBSOCKET; + } +#endif // ENABLE_SECURITY + else { + return; + } + + std::vector<TransportAdapter*>::iterator ta = transport_adapters_.begin(); + for (; ta != transport_adapters_.end(); ++ta) { + if ((*ta)->GetDeviceType() == type) { + (*ta)->CreateDevice(cloud_properties.endpoint); + transport_adapter::CloudWebsocketTransportAdapter* cta = + static_cast<transport_adapter::CloudWebsocketTransportAdapter*>(*ta); + cta->SetAppCloudTransportConfig(cloud_properties.endpoint, + cloud_properties); + } + } +#endif // CLOUD_APP_WEBSOCKET_TRANSPORT_SUPPORT + return; +} + +void TransportManagerImpl::RemoveCloudDevice(const DeviceHandle device_handle) { +#if !defined(CLOUD_APP_WEBSOCKET_TRANSPORT_SUPPORT) + LOG4CXX_TRACE(logger_, "Cloud app support is disabled. Exiting function"); + return; +#else + DisconnectDevice(device_handle); +#endif // CLOUD_APP_WEBSOCKET_TRANSPORT_SUPPORT +} + int TransportManagerImpl::ConnectDevice(const DeviceHandle device_handle) { LOG4CXX_TRACE(logger_, "enter. DeviceHandle: " << &device_handle); if (!this->is_initialized_) { @@ -157,6 +214,22 @@ int TransportManagerImpl::ConnectDevice(const DeviceHandle device_handle) { return err; } +ConnectionStatus TransportManagerImpl::GetConnectionStatus( + const DeviceHandle& device_handle) const { + DeviceUID device_id = converter_.HandleToUid(device_handle); + + sync_primitives::AutoReadLock lock(device_to_adapter_map_lock_); + DeviceToAdapterMap::const_iterator it = + device_to_adapter_map_.find(device_id); + if (it == device_to_adapter_map_.end()) { + LOG4CXX_ERROR(logger_, "No device adapter found by id " << device_handle); + LOG4CXX_TRACE(logger_, "exit with E_INVALID_HANDLE. Condition: NULL == ta"); + return ConnectionStatus::INVALID; + } + transport_adapter::TransportAdapter* ta = it->second; + return ta->GetConnectionStatus(device_id); +} + int TransportManagerImpl::DisconnectDevice(const DeviceHandle device_handle) { LOG4CXX_TRACE(logger_, "enter. DeviceHandle: " << &device_handle); if (!this->is_initialized_) { @@ -318,9 +391,9 @@ int TransportManagerImpl::Stop() { int TransportManagerImpl::SendMessageToDevice( const ::protocol_handler::RawMessagePtr message) { LOG4CXX_TRACE(logger_, "enter. RawMessageSptr: " << message); - LOG4CXX_INFO(logger_, - "Send message to device called with arguments " - << message.get()); + LOG4CXX_INFO( + logger_, + "Send message to device called with arguments " << message.get()); if (false == this->is_initialized_) { LOG4CXX_ERROR(logger_, "TM is not initialized."); LOG4CXX_TRACE(logger_, @@ -516,8 +589,18 @@ int TransportManagerImpl::SearchDevices() { return transport_adapter_search; } +int TransportManagerImpl::Init( + resumption::LastStateWrapperPtr last_state_wrapper) { + // Last state wrapper required to initialize Transport adapters + UNUSED(last_state_wrapper); + LOG4CXX_TRACE(logger_, "enter"); + is_initialized_ = true; + LOG4CXX_TRACE(logger_, "exit with E_SUCCESS"); + return E_SUCCESS; +} + int TransportManagerImpl::Init(resumption::LastState& last_state) { - // Last state requred to initialize Transport adapters + // Last state required to initialize Transport adapters UNUSED(last_state); LOG4CXX_TRACE(logger_, "enter"); is_initialized_ = true; @@ -525,19 +608,36 @@ int TransportManagerImpl::Init(resumption::LastState& last_state) { return E_SUCCESS; } -int TransportManagerImpl::Reinit() { +void TransportManagerImpl::Deinit() { LOG4CXX_AUTO_TRACE(logger_); DisconnectAllDevices(); TerminateAllAdapters(); + device_to_adapter_map_.clear(); + connection_id_counter_ = 0; +} + +int TransportManagerImpl::Reinit() { int ret = InitAllAdapters(); return ret; } -int TransportManagerImpl::Visibility(const bool& on_off) const { - LOG4CXX_TRACE(logger_, "enter. On_off: " << &on_off); - TransportAdapter::Error ret; +void TransportManagerImpl::StopEventsProcessing() { + LOG4CXX_AUTO_TRACE(logger_); + events_processing_is_active_ = false; +} - LOG4CXX_DEBUG(logger_, "Visibility change requested to " << on_off); +void TransportManagerImpl::StartEventsProcessing() { + LOG4CXX_AUTO_TRACE(logger_); + events_processing_is_active_ = true; + events_processing_cond_var_.Broadcast(); +} + +int TransportManagerImpl::PerformActionOnClients( + const TransportAction required_action) const { + LOG4CXX_TRACE(logger_, + "The following action requested: " + << static_cast<int>(required_action) + << " to be performed on connected clients"); if (!is_initialized_) { LOG4CXX_ERROR(logger_, "TM is not initialized"); LOG4CXX_TRACE(logger_, @@ -546,26 +646,82 @@ int TransportManagerImpl::Visibility(const bool& on_off) const { return E_TM_IS_NOT_INITIALIZED; } - for (std::vector<TransportAdapter*>::const_iterator it = - transport_adapters_.begin(); - it != transport_adapters_.end(); - ++it) { - if (on_off) { - ret = (*it)->StartClientListening(); - } else { - ret = (*it)->StopClientListening(); - } + TransportAdapter::Error ret = TransportAdapter::Error::UNKNOWN; + + for (auto adapter_ptr : transport_adapters_) { + ret = adapter_ptr->ChangeClientListening(required_action); + if (TransportAdapter::Error::NOT_SUPPORTED == ret) { LOG4CXX_DEBUG(logger_, - "Visibility change is not supported for adapter " - << *it << "[" << (*it)->GetDeviceType() << "]"); + "Requested action on client is not supported for adapter " + << adapter_ptr << "[" << adapter_ptr->GetDeviceType() + << "]"); } } + LOG4CXX_TRACE(logger_, "exit with E_SUCCESS"); return E_SUCCESS; } -void TransportManagerImpl::UpdateDeviceList(TransportAdapter* ta) { +void TransportManagerImpl::CreateWebEngineDevice() { +#ifndef WEBSOCKET_SERVER_TRANSPORT_SUPPORT + LOG4CXX_TRACE(logger_, "Web engine support is disabled. Exiting function"); +#else + LOG4CXX_AUTO_TRACE(logger_); + auto web_socket_ta_iterator = std::find_if( + transport_adapters_.begin(), + transport_adapters_.end(), + [](const TransportAdapter* ta) { + return transport_adapter::DeviceType::WEBENGINE_WEBSOCKET == + ta->GetDeviceType(); + }); + + if (transport_adapters_.end() == web_socket_ta_iterator) { + LOG4CXX_WARN(logger_, + "WebSocketServerTransportAdapter not found." + "Impossible to create WebEngineDevice"); + return; + } + + auto web_socket_ta = + dynamic_cast<transport_adapter::WebSocketServerTransportAdapter*>( + *web_socket_ta_iterator); + + if (!web_socket_ta) { + LOG4CXX_ERROR(logger_, + "Unable to cast from Transport Adapter to " + "WebSocketServerTransportAdapter." + "Impossible to create WebEngineDevice"); + return; + } + + std::string unique_device_id = web_socket_ta->GetStoredDeviceID(); + + DeviceHandle device_handle = converter_.UidToHandle( + unique_device_id, webengine_constants::kWebEngineConnectionType); + + web_engine_device_info_ = + DeviceInfo(device_handle, + unique_device_id, + webengine_constants::kWebEngineDeviceName, + webengine_constants::kWebEngineConnectionType); + + auto ws_device = std::make_shared<transport_adapter::WebSocketDevice>( + web_engine_device_info_.name(), web_engine_device_info_.mac_address()); + + ws_device->set_keep_on_disconnect(true); + + web_socket_ta->AddDevice(ws_device); + OnDeviceListUpdated(web_socket_ta); +#endif // WEBSOCKET_SERVER_TRANSPORT_SUPPORT +} + +const DeviceInfo& TransportManagerImpl::GetWebEngineDeviceInfo() const { + LOG4CXX_AUTO_TRACE(logger_); + return web_engine_device_info_; +} + +bool TransportManagerImpl::UpdateDeviceList(TransportAdapter* ta) { LOG4CXX_TRACE(logger_, "enter. TransportAdapter: " << ta); std::set<DeviceInfo> old_devices; std::set<DeviceInfo> new_devices; @@ -617,7 +773,9 @@ void TransportManagerImpl::UpdateDeviceList(TransportAdapter* ta) { ++it) { RaiseEvent(&TransportManagerListener::OnDeviceRemoved, *it); } + LOG4CXX_TRACE(logger_, "exit"); + return added_devices.size() + removed_devices.size() > 0; } void TransportManagerImpl::PostMessage( @@ -718,9 +876,9 @@ TransportManagerImpl::ConnectionInternal* TransportManagerImpl::GetActiveConnection( const DeviceUID& device, const ApplicationHandle& application) { LOG4CXX_AUTO_TRACE(logger_); - LOG4CXX_DEBUG(logger_, - "DeviceUID: " << device - << " ApplicationHandle: " << application); + LOG4CXX_DEBUG( + logger_, + "DeviceUID: " << device << " ApplicationHandle: " << application); for (std::vector<ConnectionInternal>::iterator it = connections_.begin(); it != connections_.end(); ++it) { @@ -766,9 +924,9 @@ void TransportManagerImpl::TryDeviceSwitch( IOSBTAdapterFinder()); if (transport_adapters_.end() == ios_bt_adapter) { - LOG4CXX_WARN( - logger_, - "There is no iAP2 Bluetooth adapter found. Switching is not possible."); + LOG4CXX_WARN(logger_, + "There is no iAP2 Bluetooth adapter found. Switching is not " + "possible."); return; } @@ -869,9 +1027,9 @@ bool TransportManagerImpl::UpdateDeviceMapping( item = device_to_adapter_map_.begin(); } - LOG4CXX_DEBUG(logger_, - "After cleanup. Device map size is " - << device_to_adapter_map_.size()); + LOG4CXX_DEBUG( + logger_, + "After cleanup. Device map size is " << device_to_adapter_map_.size()); for (DeviceList::const_iterator it = adapter_device_list.begin(); it != adapter_device_list.end(); @@ -881,10 +1039,10 @@ bool TransportManagerImpl::UpdateDeviceMapping( device_to_adapter_map_.insert(std::make_pair(device_uid, ta)); if (!result.second) { LOG4CXX_WARN(logger_, - "Device UID " - << device_uid - << " is known already. Processing skipped." - "Connection type is: " << ta->GetConnectionType()); + "Device UID " << device_uid + << " is known already. Processing skipped." + "Connection type is: " + << ta->GetConnectionType()); continue; } DeviceHandle device_handle = @@ -896,9 +1054,9 @@ bool TransportManagerImpl::UpdateDeviceMapping( RaiseEvent(&TransportManagerListener::OnDeviceFound, info); } - LOG4CXX_DEBUG(logger_, - "After update. Device map size is " - << device_to_adapter_map_.size()); + LOG4CXX_DEBUG( + logger_, + "After update. Device map size is " << device_to_adapter_map_.size()); return true; } @@ -909,7 +1067,12 @@ void TransportManagerImpl::OnDeviceListUpdated(TransportAdapter* ta) { LOG4CXX_ERROR(logger_, "Device list update failed."); return; } - UpdateDeviceList(ta); + + if (!UpdateDeviceList(ta)) { + LOG4CXX_DEBUG(logger_, "Device list was not changed"); + return; + } + std::vector<DeviceInfo> device_infos; device_list_lock_.AcquireForReading(); for (DeviceInfoList::const_iterator it = device_list_.begin(); @@ -924,6 +1087,13 @@ void TransportManagerImpl::OnDeviceListUpdated(TransportAdapter* ta) { void TransportManagerImpl::Handle(TransportAdapterEvent event) { LOG4CXX_TRACE(logger_, "enter"); + + if (!events_processing_is_active_) { + LOG4CXX_DEBUG(logger_, "Waiting for events handling unlock"); + sync_primitives::AutoLock auto_lock(events_processing_lock_); + events_processing_cond_var_.Wait(auto_lock); + } + switch (event.event_type) { case EventTypeEnum::ON_SEARCH_DONE: { RaiseEvent(&TransportManagerListener::OnScanDevicesFinished); @@ -952,22 +1122,93 @@ void TransportManagerImpl::Handle(TransportAdapterEvent event) { LOG4CXX_DEBUG(logger_, "event_type = ON_FIND_NEW_APPLICATIONS_REQUEST"); break; } + case EventTypeEnum::ON_CONNECTION_STATUS_UPDATED: { + RaiseEvent(&TransportManagerListener::OnConnectionStatusUpdated); + LOG4CXX_DEBUG(logger_, "event_type = ON_CONNECTION_STATUS_UPDATED"); + break; + } + case EventTypeEnum::ON_CONNECT_PENDING: { + const DeviceHandle device_handle = converter_.UidToHandle( + event.device_uid, event.transport_adapter->GetConnectionType()); + int connection_id = 0; + std::vector<ConnectionInternal>::iterator it = connections_.begin(); + std::vector<ConnectionInternal>::iterator end = connections_.end(); + for (; it != end; ++it) { + if (it->transport_adapter != event.transport_adapter) { + continue; + } else if (it->Connection::device != event.device_uid) { + continue; + } else if (it->Connection::application != event.application_id) { + continue; + } else if (it->device_handle_ != device_handle) { + continue; + } else { + LOG4CXX_DEBUG(logger_, "Connection Object Already Exists"); + connection_id = it->Connection::id; + break; + } + } + + if (it == end) { + AddConnection(ConnectionInternal(this, + event.transport_adapter, + ++connection_id_counter_, + event.device_uid, + event.application_id, + device_handle)); + connection_id = connection_id_counter_; + } + + RaiseEvent( + &TransportManagerListener::OnConnectionPending, + DeviceInfo(device_handle, + event.device_uid, + event.transport_adapter->DeviceName(event.device_uid), + event.transport_adapter->GetConnectionType()), + connection_id); + LOG4CXX_DEBUG(logger_, "event_type = ON_CONNECT_PENDING"); + break; + } case EventTypeEnum::ON_CONNECT_DONE: { const DeviceHandle device_handle = converter_.UidToHandle( event.device_uid, event.transport_adapter->GetConnectionType()); - AddConnection(ConnectionInternal(this, - event.transport_adapter, - ++connection_id_counter_, - event.device_uid, - event.application_id, - device_handle)); + + int connection_id = 0; + std::vector<ConnectionInternal>::iterator it = connections_.begin(); + std::vector<ConnectionInternal>::iterator end = connections_.end(); + for (; it != end; ++it) { + if (it->transport_adapter != event.transport_adapter) { + continue; + } else if (it->Connection::device != event.device_uid) { + continue; + } else if (it->Connection::application != event.application_id) { + continue; + } else if (it->device_handle_ != device_handle) { + continue; + } else { + LOG4CXX_DEBUG(logger_, "Connection Object Already Exists"); + connection_id = it->Connection::id; + break; + } + } + + if (it == end) { + AddConnection(ConnectionInternal(this, + event.transport_adapter, + ++connection_id_counter_, + event.device_uid, + event.application_id, + device_handle)); + connection_id = connection_id_counter_; + } + RaiseEvent( &TransportManagerListener::OnConnectionEstablished, DeviceInfo(device_handle, event.device_uid, event.transport_adapter->DeviceName(event.device_uid), event.transport_adapter->GetConnectionType()), - connection_id_counter_); + connection_id); LOG4CXX_DEBUG(logger_, "event_type = ON_CONNECT_DONE"); break; } @@ -1066,7 +1307,7 @@ void TransportManagerImpl::Handle(TransportAdapterEvent event) { LOG4CXX_ERROR(logger_, "Transport adapter failed to send data"); // TODO(YK): potential error case -> thread unsafe // update of message content - if (event.event_data.valid()) { + if (event.event_data.use_count() != 0) { event.event_data->set_waiting(true); } else { LOG4CXX_DEBUG(logger_, "Data is invalid"); @@ -1144,6 +1385,13 @@ void TransportManagerImpl::Handle(TransportAdapterEvent event) { LOG4CXX_DEBUG(logger_, "eevent_type = ON_UNEXPECTED_DISCONNECT"); break; } + case EventTypeEnum::ON_TRANSPORT_CONFIG_UPDATED: { + LOG4CXX_DEBUG(logger_, "event_type = ON_TRANSPORT_CONFIG_UPDATED"); + transport_adapter::TransportConfig config = + event.transport_adapter->GetTransportConfiguration(); + RaiseEvent(&TransportManagerListener::OnTransportConfigUpdated, config); + break; + } } // switch LOG4CXX_TRACE(logger_, "exit"); } @@ -1156,6 +1404,13 @@ void TransportManagerImpl::SetTelemetryObserver(TMTelemetryObserver* observer) { void TransportManagerImpl::Handle(::protocol_handler::RawMessagePtr msg) { LOG4CXX_TRACE(logger_, "enter"); + + if (!events_processing_is_active_) { + LOG4CXX_DEBUG(logger_, "Waiting for events handling unlock"); + sync_primitives::AutoLock auto_lock(events_processing_lock_); + events_processing_cond_var_.Wait(auto_lock); + } + sync_primitives::AutoReadLock lock(connections_lock_); ConnectionInternal* connection = GetConnection(msg->connection_key()); if (connection == NULL) { @@ -1203,9 +1458,9 @@ TransportManagerImpl::ConnectionInternal::ConnectionInternal( const DeviceHandle device_handle) : transport_manager(transport_manager) , transport_adapter(transport_adapter) - , timer(utils::MakeShared<timer::Timer, - const char*, - ::timer::TimerTaskImpl<ConnectionInternal>*>( + , timer(std::make_shared<timer::Timer, + const char*, + ::timer::TimerTaskImpl<ConnectionInternal>*>( "TM DiscRoutine", new ::timer::TimerTaskImpl<ConnectionInternal>( this, &ConnectionInternal::DisconnectFailedRoutine))) diff --git a/src/components/transport_manager/src/usb/libusb/platform_usb_device.cc b/src/components/transport_manager/src/usb/libusb/platform_usb_device.cc index 33709cf0f6..7d7fb49e6e 100644 --- a/src/components/transport_manager/src/usb/libusb/platform_usb_device.cc +++ b/src/components/transport_manager/src/usb/libusb/platform_usb_device.cc @@ -83,5 +83,5 @@ std::string PlatformUsbDevice::GetSerialNumber() const { return GetDescString(device_descriptor_.iSerialNumber); } -} // namespace -} // namespace +} // namespace transport_adapter +} // namespace transport_manager diff --git a/src/components/transport_manager/src/usb/libusb/usb_connection.cc b/src/components/transport_manager/src/usb/libusb/usb_connection.cc index e9ab2bae8e..4b7b22394d 100644 --- a/src/components/transport_manager/src/usb/libusb/usb_connection.cc +++ b/src/components/transport_manager/src/usb/libusb/usb_connection.cc @@ -38,8 +38,8 @@ #include <sstream> -#include "transport_manager/usb/libusb/usb_connection.h" #include "transport_manager/transport_adapter/transport_adapter_impl.h" +#include "transport_manager/usb/libusb/usb_connection.h" #include "utils/logger.h" @@ -137,7 +137,7 @@ void UsbConnection::OnInTransfer(libusb_transfer* transfer) { << transfer->actual_length << ", data:" << hex_data(transfer->buffer, transfer->actual_length)); ::protocol_handler::RawMessagePtr data(new protocol_handler::RawMessage( - 0, 0, in_buffer_, transfer->actual_length)); + 0, 0, in_buffer_, transfer->actual_length, false)); controller_->DataReceiveDone(device_uid_, app_handle_, data); } else { LOG4CXX_ERROR(logger_, @@ -159,26 +159,30 @@ void UsbConnection::OnInTransfer(libusb_transfer* transfer) { LOG4CXX_TRACE(logger_, "exit"); } -void UsbConnection::PopOutMessage() { +TransportAdapter::Error UsbConnection::PopOutMessage() { LOG4CXX_TRACE(logger_, "enter"); bytes_sent_ = 0; + auto error_code = TransportAdapter::OK; if (out_messages_.empty()) { current_out_message_.reset(); } else { current_out_message_ = out_messages_.front(); out_messages_.pop_front(); - PostOutTransfer(); + error_code = PostOutTransfer(); } LOG4CXX_TRACE(logger_, "exit"); + return error_code; } -bool UsbConnection::PostOutTransfer() { +TransportAdapter::Error UsbConnection::PostOutTransfer() { LOG4CXX_TRACE(logger_, "enter"); out_transfer_ = libusb_alloc_transfer(0); - if (0 == out_transfer_) { + if (nullptr == out_transfer_) { LOG4CXX_ERROR(logger_, "libusb_alloc_transfer failed"); - LOG4CXX_TRACE(logger_, "exit with FALSE. Condition: 0 == out_transfer_"); - return false; + LOG4CXX_TRACE(logger_, + "exit with TransportAdapter::BAD_STATE. Condition: nullptr " + "== out_transfer_"); + return TransportAdapter::BAD_STATE; } libusb_fill_bulk_transfer(out_transfer_, device_handle_, @@ -192,43 +196,50 @@ bool UsbConnection::PostOutTransfer() { if (LIBUSB_SUCCESS != libusb_ret) { LOG4CXX_ERROR( logger_, - "libusb_submit_transfer failed: " << libusb_error_name(libusb_ret) - << ". Abort connection."); - AbortConnection(); + "libusb_submit_transfer failed: " << libusb_error_name(libusb_ret)); LOG4CXX_TRACE(logger_, - "exit with FALSE. Condition: " + "exit with TransportAdapter::FAIL. Condition: " << "LIBUSB_SUCCESS != libusb_fill_bulk_transfer"); - return false; + return TransportAdapter::FAIL; } - LOG4CXX_TRACE(logger_, "exit with TRUE"); - return true; + LOG4CXX_TRACE(logger_, "exit with TransportAdapter::OK"); + return TransportAdapter::OK; } void UsbConnection::OnOutTransfer(libusb_transfer* transfer) { LOG4CXX_TRACE(logger_, "enter with Libusb_transfer*: " << transfer); - sync_primitives::AutoLock locker(out_messages_mutex_); - if (transfer->status == LIBUSB_TRANSFER_COMPLETED) { - bytes_sent_ += transfer->actual_length; - if (bytes_sent_ == current_out_message_->data_size()) { - LOG4CXX_DEBUG( + auto error_code = TransportAdapter::OK; + { + sync_primitives::AutoLock locker(out_messages_mutex_); + if (LIBUSB_TRANSFER_COMPLETED == transfer->status) { + bytes_sent_ += transfer->actual_length; + if (current_out_message_->data_size() == bytes_sent_) { + LOG4CXX_DEBUG( + logger_, + "USB out transfer, data sent: " << current_out_message_.get()); + controller_->DataSendDone( + device_uid_, app_handle_, current_out_message_); + error_code = PopOutMessage(); + } + } else { + LOG4CXX_ERROR( logger_, - "USB out transfer, data sent: " << current_out_message_.get()); - controller_->DataSendDone(device_uid_, app_handle_, current_out_message_); - PopOutMessage(); + "USB out transfer failed: " << libusb_error_name(transfer->status)); + controller_->DataSendFailed( + device_uid_, app_handle_, current_out_message_, DataSendError()); + error_code = PopOutMessage(); + } + if (current_out_message_.use_count() == 0) { + libusb_free_transfer(transfer); + out_transfer_ = nullptr; + waiting_out_transfer_cancel_ = false; } - } else { - LOG4CXX_ERROR( - logger_, - "USB out transfer failed: " << libusb_error_name(transfer->status)); - controller_->DataSendFailed( - device_uid_, app_handle_, current_out_message_, DataSendError()); - PopOutMessage(); } - if (!current_out_message_.valid()) { - libusb_free_transfer(transfer); - out_transfer_ = NULL; - waiting_out_transfer_cancel_ = false; + + if (TransportAdapter::FAIL == error_code) { + AbortConnection(); } + LOG4CXX_TRACE(logger_, "exit"); } @@ -241,22 +252,35 @@ TransportAdapter::Error UsbConnection::SendData( << "disconnecting_"); return TransportAdapter::BAD_STATE; } - sync_primitives::AutoLock locker(out_messages_mutex_); - if (current_out_message_.valid()) { - out_messages_.push_back(message); - } else { - current_out_message_ = message; - if (!PostOutTransfer()) { - controller_->DataSendFailed( - device_uid_, app_handle_, message, DataSendError()); - LOG4CXX_TRACE( - logger_, - "exit with TransportAdapter::FAIL. Condition: !PostOutTransfer()"); - return TransportAdapter::FAIL; + + auto process_message = [this, &message]() { + sync_primitives::AutoLock locker(out_messages_mutex_); + if (current_out_message_.use_count() == 0) { + current_out_message_ = message; + return PostOutTransfer(); } + out_messages_.push_back(message); + return TransportAdapter::OK; + }; + + auto error_code = process_message(); + + if (TransportAdapter::OK == error_code) { + LOG4CXX_TRACE(logger_, "exit with TransportAdapter::OK."); + return TransportAdapter::OK; } - LOG4CXX_TRACE(logger_, "exit with TransportAdapter::OK."); - return TransportAdapter::OK; + + controller_->DataSendFailed( + device_uid_, app_handle_, message, DataSendError()); + + if (TransportAdapter::FAIL == error_code) { + AbortConnection(); + } + + LOG4CXX_TRACE(logger_, + "exit with TransportAdapter::FAIL. PostOutTransfer сondition: " + << error_code); + return TransportAdapter::FAIL; } void UsbConnection::Finalise() { @@ -293,9 +317,9 @@ void UsbConnection::Finalise() { void UsbConnection::AbortConnection() { LOG4CXX_TRACE(logger_, "enter"); + Finalise(); controller_->ConnectionAborted( device_uid_, app_handle_, CommunicationError()); - Disconnect(); LOG4CXX_TRACE(logger_, "exit"); } diff --git a/src/components/transport_manager/src/usb/libusb/usb_handler.cc b/src/components/transport_manager/src/usb/libusb/usb_handler.cc index c62e80d1be..d1fc0af7f7 100644 --- a/src/components/transport_manager/src/usb/libusb/usb_handler.cc +++ b/src/components/transport_manager/src/usb/libusb/usb_handler.cc @@ -33,14 +33,14 @@ * POSSIBILITY OF SUCH DAMAGE. */ -#include <cstring> #include <cstdlib> +#include <cstring> -#include "transport_manager/usb/common.h" #include "transport_manager/transport_adapter/transport_adapter_impl.h" +#include "transport_manager/usb/common.h" -#include "utils/macro.h" #include "utils/logger.h" +#include "utils/macro.h" #include "utils/threads/thread.h" namespace transport_manager { @@ -77,12 +77,12 @@ class UsbHandler::ControlTransferSequenceState { UsbHandler::UsbHandler() : shutdown_requested_(false) - , thread_(NULL) + , thread_(nullptr) , usb_device_listeners_() , devices_() , transfer_sequences_() , device_handles_to_close_() - , libusb_context_(NULL) + , libusb_context_(nullptr) , arrived_callback_handle_() , left_callback_handle_() { thread_ = threads::CreateThread("UsbHandler", new UsbHandlerDelegate(this)); @@ -90,20 +90,24 @@ UsbHandler::UsbHandler() UsbHandler::~UsbHandler() { shutdown_requested_ = true; - if (libusb_context_ != 0) { + LOG4CXX_INFO(logger_, "UsbHandler thread finished"); + + if (libusb_context_) { + // The libusb_hotplug_deregister_callback() wakes up blocking call of + // libusb_handle_events_completed() in the Thread() method of delegate libusb_hotplug_deregister_callback(libusb_context_, arrived_callback_handle_); libusb_hotplug_deregister_callback(libusb_context_, left_callback_handle_); } - thread_->stop(); - LOG4CXX_INFO(logger_, "UsbHandler thread finished"); - if (libusb_context_) { - libusb_exit(libusb_context_); - libusb_context_ = 0; - } + thread_->join(); delete thread_->delegate(); threads::DeleteThread(thread_); + + if (libusb_context_) { + libusb_exit(libusb_context_); + libusb_context_ = nullptr; + } } void UsbHandler::DeviceArrived(libusb_device* device_libusb) { @@ -518,5 +522,11 @@ void UsbHandler::UsbHandlerDelegate::threadMain() { handler_->Thread(); } +void UsbHandler::UsbHandlerDelegate::exitThreadMain() { + LOG4CXX_AUTO_TRACE(logger_); + // Empty method required in order to avoid force delegate thread + // finishing by exitThreadMain() of the base class +} + } // namespace transport_adapter } // namespace transport_manager diff --git a/src/components/transport_manager/src/usb/qnx/platform_usb_device.cc b/src/components/transport_manager/src/usb/qnx/platform_usb_device.cc index e85ab12b10..bb6e341659 100644 --- a/src/components/transport_manager/src/usb/qnx/platform_usb_device.cc +++ b/src/components/transport_manager/src/usb/qnx/platform_usb_device.cc @@ -76,5 +76,5 @@ std::string PlatformUsbDevice::GetSerialNumber() const { return GetDescString(device_descriptor_.iSerialNumber); } -} // namespace -} // namespace +} // namespace transport_adapter +} // namespace transport_manager diff --git a/src/components/transport_manager/src/usb/qnx/usb_connection.cc b/src/components/transport_manager/src/usb/qnx/usb_connection.cc index 516a367ebc..2945639ce9 100644 --- a/src/components/transport_manager/src/usb/qnx/usb_connection.cc +++ b/src/components/transport_manager/src/usb/qnx/usb_connection.cc @@ -37,8 +37,8 @@ #include <sched.h> #include <cstring> -#include "transport_manager/usb/qnx/usb_connection.h" #include "transport_manager/transport_adapter/transport_adapter_impl.h" +#include "transport_manager/usb/qnx/usb_connection.h" #include "utils/logger.h" @@ -144,7 +144,7 @@ void UsbConnection::OnInTransfer(usbd_urb* urb) { device_uid_, app_handle_, DataReceiveError()); } else { ::protocol_handler::RawMessagePtr msg( - new protocol_handler::RawMessage(0, 0, in_buffer_, len)); + new protocol_handler::RawMessage(0, 0, in_buffer_, len, false)); controller_->DataReceiveDone(device_uid_, app_handle_, msg); } @@ -230,7 +230,7 @@ void UsbConnection::OnOutTransfer(usbd_urb* urb) { } } - if ((!disconnecting_) && current_out_message_.valid()) { + if ((!disconnecting_) && (current_out_message_.use_count() != 0)) { PostOutTransfer(); } else { pending_out_transfer_ = false; @@ -243,7 +243,7 @@ TransportAdapter::Error UsbConnection::SendData( return TransportAdapter::BAD_STATE; } sync_primitives::AutoLock locker(out_messages_mutex_); - if (current_out_message_.valid()) { + if (current_out_message_.use_count() != 0) { out_messages_.push_back(message); } else { current_out_message_ = message; diff --git a/src/components/transport_manager/src/usb/qnx/usb_handler.cc b/src/components/transport_manager/src/usb/qnx/usb_handler.cc index 2336a05f16..9ac4a40d92 100644 --- a/src/components/transport_manager/src/usb/qnx/usb_handler.cc +++ b/src/components/transport_manager/src/usb/qnx/usb_handler.cc @@ -35,11 +35,11 @@ #include <errno.h> -#include <cstring> #include <cstdlib> +#include <cstring> -#include "transport_manager/usb/common.h" #include "transport_manager/transport_adapter/transport_adapter_impl.h" +#include "transport_manager/usb/common.h" #include "utils/logger.h" @@ -321,5 +321,5 @@ TransportAdapter::Error UsbHandler::Init() { return TransportAdapter::OK; } -} // namespace -} // namespace +} // namespace transport_adapter +} // namespace transport_manager diff --git a/src/components/transport_manager/src/usb/usb_aoa_adapter.cc b/src/components/transport_manager/src/usb/usb_aoa_adapter.cc index b7faf1ef6b..9ce571eb6c 100644 --- a/src/components/transport_manager/src/usb/usb_aoa_adapter.cc +++ b/src/components/transport_manager/src/usb/usb_aoa_adapter.cc @@ -34,21 +34,21 @@ */ #include "transport_manager/usb/usb_aoa_adapter.h" -#include "transport_manager/usb/usb_device_scanner.h" -#include "transport_manager/usb/usb_connection_factory.h" #include "transport_manager/usb/common.h" +#include "transport_manager/usb/usb_connection_factory.h" +#include "transport_manager/usb/usb_device_scanner.h" #include "utils/logger.h" namespace transport_manager { namespace transport_adapter { CREATE_LOGGERPTR_GLOBAL(logger_, "TransportManager") -UsbAoaAdapter::UsbAoaAdapter(resumption::LastState& last_state, +UsbAoaAdapter::UsbAoaAdapter(resumption::LastStateWrapperPtr last_state_wrapper, const TransportManagerSettings& settings) - : TransportAdapterImpl(new UsbDeviceScanner(this), + : TransportAdapterImpl(new UsbDeviceScanner(this, settings), new UsbConnectionFactory(this), NULL, - last_state, + last_state_wrapper, settings) , is_initialised_(false) , usb_handler_(new UsbHandler()) { diff --git a/src/components/transport_manager/src/usb/usb_connection_factory.cc b/src/components/transport_manager/src/usb/usb_connection_factory.cc index 1136dfad21..401dde2534 100644 --- a/src/components/transport_manager/src/usb/usb_connection_factory.cc +++ b/src/components/transport_manager/src/usb/usb_connection_factory.cc @@ -31,10 +31,9 @@ */ #include "transport_manager/usb/usb_connection_factory.h" -#include "transport_manager/usb/usb_device.h" #include "transport_manager/transport_adapter/transport_adapter_impl.h" +#include "transport_manager/usb/usb_device.h" #include "utils/logger.h" -#include "utils/make_shared.h" #if defined(__QNXNTO__) #include "transport_manager/usb/qnx/usb_connection.h" @@ -65,21 +64,21 @@ TransportAdapter::Error UsbConnectionFactory::CreateConnection( "enter DeviceUID: " << &device_uid << ", ApplicationHandle: " << &app_handle); DeviceSptr device = controller_->FindDevice(device_uid); - if (!device.valid()) { + if (device.use_count() == 0) { LOG4CXX_ERROR(logger_, "device " << device_uid << " not found"); - LOG4CXX_TRACE( - logger_, - "exit with TransportAdapter::BAD_PARAM. Condition: !device.valid()"); + LOG4CXX_TRACE(logger_, + "exit with TransportAdapter::BAD_PARAM. Condition: " + "device.use_count() == 0"); return TransportAdapter::BAD_PARAM; } UsbDevice* usb_device = static_cast<UsbDevice*>(device.get()); - utils::SharedPtr<UsbConnection> connection = - utils::MakeShared<UsbConnection>(device_uid, - app_handle, - controller_, - usb_handler_, - usb_device->usb_device()); + std::shared_ptr<UsbConnection> connection = + std::make_shared<UsbConnection>(device_uid, + app_handle, + controller_, + usb_handler_, + usb_device->usb_device()); controller_->ConnectionCreated(connection, device_uid, app_handle); if (connection->Init()) { LOG4CXX_INFO(logger_, "USB connection initialised"); diff --git a/src/components/transport_manager/src/usb/usb_device_scanner.cc b/src/components/transport_manager/src/usb/usb_device_scanner.cc index 092fd29f1a..51d521c1bf 100644 --- a/src/components/transport_manager/src/usb/usb_device_scanner.cc +++ b/src/components/transport_manager/src/usb/usb_device_scanner.cc @@ -33,9 +33,9 @@ #include <sstream> #include "transport_manager/transport_adapter/transport_adapter_impl.h" -#include "transport_manager/usb/usb_device_scanner.h" -#include "transport_manager/usb/usb_device.h" #include "transport_manager/usb/common.h" +#include "transport_manager/usb/usb_device.h" +#include "transport_manager/usb/usb_device_scanner.h" #include "utils/logger.h" @@ -47,6 +47,7 @@ CREATE_LOGGERPTR_GLOBAL(logger_, "TransportManager") class AoaInitSequence : public UsbControlTransferSequence { public: AoaInitSequence(); + AoaInitSequence(const TransportManagerSettings& settings); virtual ~AoaInitSequence() {} private: @@ -86,8 +87,9 @@ void UsbDeviceScanner::OnDeviceLeft(PlatformUsbDevice* device) { } } -UsbDeviceScanner::UsbDeviceScanner(TransportAdapterController* controller) - : controller_(controller) {} +UsbDeviceScanner::UsbDeviceScanner(TransportAdapterController* controller, + const TransportManagerSettings& settings) + : controller_(controller), settings_(settings) {} UsbDeviceScanner::~UsbDeviceScanner() {} @@ -171,28 +173,29 @@ class AoaInitSequence::AoaTurnIntoAccessoryMode : public UsbControlOutTransfer { } }; -static char manufacturer[] = "SDL"; -static char model_name[] = "Core"; -static char description[] = "SmartDeviceLink Core Component USB"; -static char version[] = "1.0"; -static char uri[] = "http://www.smartdevicelink.org"; -static char serial_num[] = "N000000"; - -AoaInitSequence::AoaInitSequence() : UsbControlTransferSequence() { +AoaInitSequence::AoaInitSequence(const TransportManagerSettings& settings) + : UsbControlTransferSequence() { + auto manufacturer = settings.aoa_filter_manufacturer().c_str(); + auto model_name = settings.aoa_filter_model_name().c_str(); + auto description = settings.aoa_filter_description().c_str(); + auto version = settings.aoa_filter_version().c_str(); + auto uri = settings.aoa_filter_uri().c_str(); + auto serial_num = settings.aoa_filter_serial_number().c_str(); AddTransfer(new AoaGetProtocolRequest); - AddTransfer(new AoaSendIdString(0, manufacturer, sizeof(manufacturer))); - AddTransfer(new AoaSendIdString(1, model_name, sizeof(model_name))); - AddTransfer(new AoaSendIdString(2, description, sizeof(description))); - AddTransfer(new AoaSendIdString(3, version, sizeof(version))); - AddTransfer(new AoaSendIdString(4, uri, sizeof(uri))); - AddTransfer(new AoaSendIdString(5, serial_num, sizeof(serial_num))); + AddTransfer(new AoaSendIdString(0, manufacturer, strlen(manufacturer))); + AddTransfer(new AoaSendIdString(1, model_name, strlen(model_name))); + AddTransfer(new AoaSendIdString(2, description, strlen(description))); + AddTransfer(new AoaSendIdString(3, version, strlen(version))); + AddTransfer(new AoaSendIdString(4, uri, strlen(uri))); + AddTransfer(new AoaSendIdString(5, serial_num, strlen(serial_num))); AddTransfer(new AoaTurnIntoAccessoryMode); } void UsbDeviceScanner::TurnIntoAccessoryMode(PlatformUsbDevice* device) { LOG4CXX_AUTO_TRACE(logger_); LOG4CXX_DEBUG(logger_, "PlatformUsbDevice: " << device); - GetUsbHandler()->StartControlTransferSequence(new AoaInitSequence, device); + GetUsbHandler()->StartControlTransferSequence(new AoaInitSequence(settings_), + device); } void UsbDeviceScanner::SupportedDeviceFound(PlatformUsbDevice* device) { @@ -247,5 +250,5 @@ bool UsbDeviceScanner::IsInitialised() const { return true; } -} // namespace -} // namespace +} // namespace transport_adapter +} // namespace transport_manager diff --git a/src/components/transport_manager/src/websocket_server/websocket_connection.cc b/src/components/transport_manager/src/websocket_server/websocket_connection.cc new file mode 100644 index 0000000000..7bcc4baef2 --- /dev/null +++ b/src/components/transport_manager/src/websocket_server/websocket_connection.cc @@ -0,0 +1,218 @@ +/* +Copyright (c) 2020 Livio, Inc. +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +* Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + +* Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + +* Neither the name of SmartDeviceLink Consortium, Inc. nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +*/ + +#include "transport_manager/websocket_server/websocket_connection.h" +#include <unistd.h> +#include "transport_manager/transport_adapter/transport_adapter_controller.h" + +namespace transport_manager { +namespace transport_adapter { + +CREATE_LOGGERPTR_GLOBAL(wsc_logger_, "WebSocketConnection") + +using namespace boost::beast::websocket; + +template <> +WebSocketConnection<WebSocketSession<> >::WebSocketConnection( + const DeviceUID& device_uid, + const ApplicationHandle& app_handle, + boost::asio::ip::tcp::socket socket, + TransportAdapterController* controller) + : device_uid_(device_uid) + , app_handle_(app_handle) + , session_(new WebSocketSession<>( + std::move(socket), + std::bind( + &WebSocketConnection::DataReceive, this, std::placeholders::_1), + std::bind(&WebSocketConnection::OnError, this))) + , controller_(controller) + , shutdown_(false) + , thread_delegate_(new LoopThreadDelegate( + &message_queue_, + std::bind(&WebSocketSession<>::WriteDown, + session_.get(), + std::placeholders::_1), + std::bind(&WebSocketConnection::OnError, this))) + , thread_(threads::CreateThread("WS Async Send", thread_delegate_)) { + thread_->start(threads::ThreadOptions()); +} + +#ifdef ENABLE_SECURITY +template <> +WebSocketConnection<WebSocketSecureSession<> >::WebSocketConnection( + const DeviceUID& device_uid, + const ApplicationHandle& app_handle, + boost::asio::ip::tcp::socket socket, + ssl::context& ctx, + TransportAdapterController* controller) + : device_uid_(device_uid) + , app_handle_(app_handle) + , session_(new WebSocketSecureSession<>( + std::move(socket), + ctx, + std::bind( + &WebSocketConnection::DataReceive, this, std::placeholders::_1), + std::bind(&WebSocketConnection::OnError, this))) + , controller_(controller) + , shutdown_(false) + , thread_delegate_(new LoopThreadDelegate( + &message_queue_, + std::bind(&WebSocketSecureSession<>::WriteDown, + session_.get(), + std::placeholders::_1), + std::bind(&WebSocketConnection::OnError, this))) + , thread_(threads::CreateThread("WS Async Send", thread_delegate_)) { + thread_->start(threads::ThreadOptions()); +} +template class WebSocketConnection<WebSocketSecureSession<> >; +#endif // ENABLE_SECURITY + +template <typename Session> +WebSocketConnection<Session>::~WebSocketConnection() { + if (!IsShuttingDown()) { + Shutdown(); + } +} + +template <typename Session> +void WebSocketConnection<Session>::OnError() { + LOG4CXX_AUTO_TRACE(wsc_logger_); + + controller_->ConnectionAborted( + device_uid_, app_handle_, CommunicationError()); + + session_->Shutdown(); +} + +template <typename Session> +TransportAdapter::Error WebSocketConnection<Session>::Disconnect() { + LOG4CXX_AUTO_TRACE(wsc_logger_); + if (!IsShuttingDown()) { + Shutdown(); + controller_->DisconnectDone(device_uid_, app_handle_); + return TransportAdapter::OK; + } + return TransportAdapter::BAD_STATE; +} + +template <typename Session> +TransportAdapter::Error WebSocketConnection<Session>::SendData( + ::protocol_handler::RawMessagePtr message) { + if (IsShuttingDown()) { + return TransportAdapter::BAD_STATE; + } + + message_queue_.push(message); + + return TransportAdapter::OK; +} + +template <typename Session> +void WebSocketConnection<Session>::DataReceive( + protocol_handler::RawMessagePtr frame) { + controller_->DataReceiveDone(device_uid_, app_handle_, frame); +} + +template <typename Session> +void WebSocketConnection<Session>::Run() { + LOG4CXX_AUTO_TRACE(wsc_logger_); + session_->AsyncAccept(); +} + +template <typename Session> +void WebSocketConnection<Session>::Shutdown() { + LOG4CXX_AUTO_TRACE(wsc_logger_); + shutdown_ = true; + if (thread_delegate_) { + session_->Shutdown(); + thread_delegate_->SetShutdown(); + thread_->join(); + delete thread_delegate_; + thread_delegate_ = nullptr; + threads::DeleteThread(thread_); + thread_ = nullptr; + } +} + +template <typename Session> +bool WebSocketConnection<Session>::IsShuttingDown() { + return shutdown_; +} + +template <typename Session> +WebSocketConnection<Session>::LoopThreadDelegate::LoopThreadDelegate( + MessageQueue<Message, AsyncQueue>* message_queue, + DataWriteCallback data_write, + OnIOErrorCallback on_io_error) + : message_queue_(*message_queue) + , data_write_(data_write) + , on_io_error_(on_io_error) {} + +template <typename Session> +void WebSocketConnection<Session>::LoopThreadDelegate::threadMain() { + while (!message_queue_.IsShuttingDown()) { + DrainQueue(); + message_queue_.wait(); + } + DrainQueue(); +} + +template <typename Session> +void WebSocketConnection<Session>::LoopThreadDelegate::exitThreadMain() { + if (!message_queue_.IsShuttingDown()) { + message_queue_.Shutdown(); + } +} + +template <typename Session> +void WebSocketConnection<Session>::LoopThreadDelegate::DrainQueue() { + Message message_ptr; + while (!message_queue_.IsShuttingDown() && message_queue_.pop(message_ptr)) { + auto res = data_write_(message_ptr); + if (TransportAdapter::FAIL == res) { + LOG4CXX_WARN(wsc_logger_, + "Writing to websocket stream failed. Will now close " + "websocket connection."); + on_io_error_(); + } + } +} + +template <typename Session> +void WebSocketConnection<Session>::LoopThreadDelegate::SetShutdown() { + if (!message_queue_.IsShuttingDown()) { + message_queue_.Shutdown(); + } +} + +template class WebSocketConnection<WebSocketSession<> >; + +} // namespace transport_adapter +} // namespace transport_manager diff --git a/src/components/transport_manager/src/websocket_server/websocket_device.cc b/src/components/transport_manager/src/websocket_server/websocket_device.cc new file mode 100644 index 0000000000..502daf349b --- /dev/null +++ b/src/components/transport_manager/src/websocket_server/websocket_device.cc @@ -0,0 +1,94 @@ +/* + * + * Copyright (c) 2020, Livio + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following + * disclaimer in the documentation and/or other materials provided with the + * distribution. + * + * Neither the name of the Ford Motor Company nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "transport_manager/websocket_server/websocket_device.h" + +#include "utils/logger.h" + +namespace transport_manager { +namespace transport_adapter { +CREATE_LOGGERPTR_GLOBAL(logger_, "TransportManager") + +WebSocketDevice::WebSocketDevice(const std::string& name, + const DeviceUID& unique_device_id) + : Device(name, unique_device_id) + , is_secure_connect_(false) + , protocol_(boost::asio::ip::tcp::v4()) {} + +bool WebSocketDevice::IsSameAs(const Device* other) const { + LOG4CXX_TRACE(logger_, "enter. device: " << other); + + const WebSocketDevice* other_websocket_device = + dynamic_cast<const WebSocketDevice*>(other); + + if (!other_websocket_device) { + return false; + } + + if (GetHost() != other_websocket_device->GetHost()) { + return false; + } + + if (GetPort() != other_websocket_device->GetPort()) { + return false; + } + + if (GetTarget() != other_websocket_device->GetTarget()) { + return false; + } + + return true; +} + +ApplicationList WebSocketDevice::GetApplicationList() const { + return app_list_; +} + +const std::string& WebSocketDevice::GetHost() const { + return host_; +} + +const std::string& WebSocketDevice::GetPort() const { + return port_; +} + +const std::string WebSocketDevice::GetTarget() const { + return host_ + port_ + name(); +} + +void WebSocketDevice::AddApplication(const ApplicationHandle& app_handle) { + app_list_.push_back(app_handle); +} + +} // namespace transport_adapter +} // namespace transport_manager diff --git a/src/components/transport_manager/src/websocket_server/websocket_listener.cc b/src/components/transport_manager/src/websocket_server/websocket_listener.cc new file mode 100644 index 0000000000..87fff3acbc --- /dev/null +++ b/src/components/transport_manager/src/websocket_server/websocket_listener.cc @@ -0,0 +1,299 @@ +#include "transport_manager/websocket_server/websocket_listener.h" +#include "transport_manager/transport_adapter/transport_adapter_controller.h" +#include "transport_manager/websocket_server/websocket_device.h" +#include "utils/file_system.h" + +namespace transport_manager { +namespace transport_adapter { +CREATE_LOGGERPTR_GLOBAL(logger_, "WebSocketListener") + +WebSocketListener::WebSocketListener(TransportAdapterController* controller, + const TransportManagerSettings& settings, + const int num_threads) + : controller_(controller) + , ioc_(num_threads) +#ifdef ENABLE_SECURITY + , ctx_(ssl::context::sslv23) + , start_secure_(false) +#endif // ENABLE_SECURITY + , acceptor_(ioc_) + , socket_(ioc_) + , io_pool_(new boost::asio::thread_pool(num_threads)) + , num_threads_(num_threads) + , shutdown_(false) + , settings_(settings) { +} + +WebSocketListener::~WebSocketListener() { + Terminate(); +} + +TransportAdapter::Error WebSocketListener::Init() { + LOG4CXX_AUTO_TRACE(logger_); + const auto old_shutdown_value = shutdown_.exchange(false); + if (old_shutdown_value) { + ioc_.restart(); + io_pool_.reset(new boost::asio::thread_pool(num_threads_)); + } + return StartListening(); +} + +void WebSocketListener::Terminate() { + LOG4CXX_AUTO_TRACE(logger_); + Shutdown(); +} + +TransportAdapter::Error WebSocketListener::StartListening() { + LOG4CXX_AUTO_TRACE(logger_); + if (acceptor_.is_open()) { + return TransportAdapter::OK; + } + +#ifdef ENABLE_SECURITY + auto const ta_error = AddCertificateAuthority(); + if (TransportAdapter::OK != ta_error) { + return ta_error; + } +#endif + + auto const address = + boost::asio::ip::make_address(settings_.websocket_server_address()); + tcp::endpoint endpoint = {address, settings_.websocket_server_port()}; + + // Open the acceptor + boost::system::error_code ec; + acceptor_.open(endpoint.protocol(), ec); + if (ec) { + auto str_err = "ErrorOpen: " + ec.message(); + LOG4CXX_ERROR(logger_, + str_err << " host/port: " << endpoint.address().to_string() + << "/" << endpoint.port()); + return TransportAdapter::FAIL; + } + + acceptor_.set_option(boost::asio::socket_base::reuse_address(true), ec); + if (ec) { + std::string str_err = "ErrorSetOption: " + ec.message(); + LOG4CXX_ERROR(logger_, + str_err << " host/port: " << endpoint.address().to_string() + << "/" << endpoint.port()); + return TransportAdapter::FAIL; + } + + // Bind to the server address + acceptor_.bind(endpoint, ec); + if (ec) { + std::string str_err = "ErrorBind: " + ec.message(); + LOG4CXX_ERROR(logger_, + str_err << " host/port: " << endpoint.address().to_string() + << "/" << endpoint.port()); + return TransportAdapter::FAIL; + } + + // Start listening for connections + acceptor_.listen(boost::asio::socket_base::max_listen_connections, ec); + if (ec) { + std::string str_err = "ErrorListen: " + ec.message(); + LOG4CXX_ERROR(logger_, + str_err << " host/port: " << endpoint.address().to_string() + << "/" << endpoint.port()); + return TransportAdapter::FAIL; + } + + if (false == Run()) { + return TransportAdapter::FAIL; + } + + return TransportAdapter::OK; +} + +#ifdef ENABLE_SECURITY +TransportAdapter::Error WebSocketListener::AddCertificateAuthority() { + LOG4CXX_AUTO_TRACE(logger_); + + const auto cert_path = settings_.ws_server_cert_path(); + LOG4CXX_DEBUG(logger_, "Path to certificate : " << cert_path); + const auto key_path = settings_.ws_server_key_path(); + LOG4CXX_DEBUG(logger_, "Path to key : " << key_path); + const auto ca_cert_path = settings_.ws_server_ca_cert_path(); + LOG4CXX_DEBUG(logger_, "Path to ca cert : " << ca_cert_path); + start_secure_ = settings_.wss_server_supported(); + + if (start_secure_ && (!file_system::FileExists(cert_path) || + !file_system::FileExists(key_path) || + !file_system::FileExists(ca_cert_path))) { + LOG4CXX_ERROR(logger_, "Certificate or key file not found"); + return TransportAdapter::FAIL; + } + + if (!start_secure_) { + auto check_config = [](const std::string& config, + const std::string config_name) { + bool start_unsecure = config.empty(); + if (!start_unsecure) { + LOG4CXX_ERROR(logger_, + "Configuration for secure WS is incomplete. " + << config_name + << " config is " + "present, meanwhile others may be missing. Please " + "check INI file"); + } + return start_unsecure; + }; + if (!check_config(cert_path, "Server cert") || + !check_config(key_path, "Server key") || + !check_config(ca_cert_path, "CA cert")) { + return TransportAdapter::FAIL; + } + } else { + LOG4CXX_INFO(logger_, "WebSocket server will start secure connection"); + ctx_.add_verify_path(cert_path); + ctx_.set_options(boost::asio::ssl::context::default_workarounds); + using context = boost::asio::ssl::context_base; + ctx_.set_verify_mode(ssl::verify_peer | ssl::verify_fail_if_no_peer_cert); + boost::system::error_code sec_ec; + ctx_.use_certificate_chain_file(cert_path, sec_ec); + ctx_.load_verify_file(ca_cert_path); + if (sec_ec) { + LOG4CXX_ERROR( + logger_, + "Loading WS server certificate failed: " << sec_ec.message()); + return TransportAdapter::FAIL; + } + sec_ec.clear(); + ctx_.use_private_key_file(key_path, context::pem, sec_ec); + if (sec_ec) { + LOG4CXX_ERROR(logger_, + "Loading WS server key failed: " << sec_ec.message()); + return TransportAdapter::FAIL; + } + } + + return TransportAdapter::OK; +} +#endif // ENABLE_SECURITY + +bool WebSocketListener::Run() { + LOG4CXX_AUTO_TRACE(logger_); + const bool is_connection_open = WaitForConnection(); + if (is_connection_open) { + boost::asio::post(*io_pool_.get(), [&]() { ioc_.run(); }); + } else { + LOG4CXX_ERROR(logger_, "Connection is shutdown or acceptor isn't open"); + } + + return is_connection_open; +} + +bool WebSocketListener::WaitForConnection() { + LOG4CXX_AUTO_TRACE(logger_); + if (!shutdown_ && acceptor_.is_open()) { + acceptor_.async_accept( + socket_, + std::bind( + &WebSocketListener::StartSession, this, std::placeholders::_1)); + return true; + } + return false; +} + +template <> +void WebSocketListener::ProcessConnection( + std::shared_ptr<WebSocketConnection<WebSocketSession<> > > connection, + const DeviceSptr device, + const ApplicationHandle app_handle) { + LOG4CXX_AUTO_TRACE(logger_); + controller_->ConnectionCreated( + connection, device->unique_device_id(), app_handle); + + controller_->ConnectDone(device->unique_device_id(), app_handle); + + connection->Run(); + + connection_list_lock.Acquire(); + connection_list_.push_back(connection); + connection_list_lock.Release(); + + WaitForConnection(); +} + +#ifdef ENABLE_SECURITY +template <> +void WebSocketListener::ProcessConnection( + std::shared_ptr<WebSocketConnection<WebSocketSecureSession<> > > connection, + const DeviceSptr device, + const ApplicationHandle app_handle) { + LOG4CXX_AUTO_TRACE(logger_); + controller_->ConnectionCreated( + connection, device->unique_device_id(), app_handle); + + controller_->ConnectDone(device->unique_device_id(), app_handle); + + connection->Run(); + + connection_list_lock.Acquire(); + connection_list_.push_back(connection); + connection_list_lock.Release(); + + WaitForConnection(); +} +#endif // ENABLE_SECURITY + +void WebSocketListener::StartSession(boost::system::error_code ec) { + LOG4CXX_AUTO_TRACE(logger_); + if (ec) { + std::string str_err = "ErrorAccept: " + ec.message(); + LOG4CXX_ERROR(logger_, str_err); + return; + } + + if (shutdown_) { + return; + } + + const ApplicationHandle app_handle = socket_.native_handle(); + + std::shared_ptr<WebSocketDevice> device = + std::static_pointer_cast<WebSocketDevice>( + controller_->GetWebEngineDevice()); + + LOG4CXX_INFO(logger_, "Connected client: " << app_handle); + +#ifdef ENABLE_SECURITY + if (start_secure_) { + auto connection = + std::make_shared<WebSocketConnection<WebSocketSecureSession<> > >( + device->unique_device_id(), + app_handle, + std::move(socket_), + ctx_, + controller_); + ProcessConnection(connection, device, app_handle); + return; + } +#endif // ENABLE_SECURITY + + auto connection = std::make_shared<WebSocketConnection<WebSocketSession<> > >( + device->unique_device_id(), app_handle, std::move(socket_), controller_); + ProcessConnection(connection, device, app_handle); +} + +void WebSocketListener::Shutdown() { + LOG4CXX_AUTO_TRACE(logger_); + if (false == shutdown_.exchange(true)) { + ioc_.stop(); + socket_.close(); + boost::system::error_code ec; + acceptor_.close(ec); + + if (ec) { + LOG4CXX_ERROR(logger_, "Acceptor closed with error: " << ec); + } + + io_pool_->stop(); + io_pool_->join(); + } +} + +} // namespace transport_adapter +} // namespace transport_manager diff --git a/src/components/transport_manager/src/websocket_server/websocket_secure_session.cc b/src/components/transport_manager/src/websocket_server/websocket_secure_session.cc new file mode 100644 index 0000000000..9be94119b6 --- /dev/null +++ b/src/components/transport_manager/src/websocket_server/websocket_secure_session.cc @@ -0,0 +1,79 @@ +/* +Copyright (c) 2020 Livio, Inc. +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +* Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + +* Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + +* Neither the name of SmartDeviceLink Consortium, Inc. nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +*/ + +#include "transport_manager/websocket_server/websocket_secure_session.h" +#include <unistd.h> +#include "transport_manager/transport_adapter/transport_adapter_controller.h" + +namespace transport_manager { +namespace transport_adapter { + +using namespace boost::beast::websocket; + +template <typename ExecutorType> +WebSocketSecureSession<ExecutorType>::WebSocketSecureSession( + tcp::socket socket, + ssl::context& ctx, + DataReceiveCallback data_receive, + OnIOErrorCallback on_error) + : WebSocketSession<ExecutorType>( + std::move(socket), ctx, data_receive, on_error) {} + +template <typename ExecutorType> +void WebSocketSecureSession<ExecutorType>::AsyncAccept() { + LOG4CXX_AUTO_TRACE(ws_logger_); + // Perform the SSL handshake + WebSocketSecureSession<ExecutorType>::ws_.next_layer().async_handshake( + ssl::stream_base::server, + boost::asio::bind_executor( + WebSocketSecureSession<ExecutorType>::strand_, + std::bind(&WebSocketSecureSession::AsyncHandshake, + this->shared_from_this(), + std::placeholders::_1))); +} + +template <typename ExecutorType> +void WebSocketSecureSession<ExecutorType>::AsyncHandshake( + boost::system::error_code ec) { + LOG4CXX_AUTO_TRACE(ws_logger_); + if (ec) { + auto str_err = "ErrorMessage: " + ec.message(); + LOG4CXX_ERROR(ws_logger_, str_err); + WebSocketSession<ExecutorType>::on_io_error_(); + return; + } + + WebSocketSession<ExecutorType>::AsyncAccept(); +} + +template class WebSocketSecureSession<ssl::stream<tcp::socket&> >; + +} // namespace transport_adapter +} // namespace transport_manager diff --git a/src/components/transport_manager/src/websocket_server/websocket_server_transport_adapter.cc b/src/components/transport_manager/src/websocket_server/websocket_server_transport_adapter.cc new file mode 100644 index 0000000000..ac8789eee8 --- /dev/null +++ b/src/components/transport_manager/src/websocket_server/websocket_server_transport_adapter.cc @@ -0,0 +1,141 @@ +/* + * Copyright (c) 2020, Ford Motor Company + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following + * disclaimer in the documentation and/or other materials provided with the + * distribution. + * + * Neither the name of the Ford Motor Company nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "transport_manager/websocket_server/websocket_server_transport_adapter.h" + +#include <errno.h> +#include <memory.h> +#include <signal.h> +#include <stdio.h> + +#include <cstdlib> +#include <sstream> + +#include "transport_manager/websocket_server/websocket_listener.h" +#include "utils/gen_hash.h" +#include "utils/logger.h" +#include "utils/threads/thread_delegate.h" + +namespace transport_manager { +namespace transport_adapter { + +CREATE_LOGGERPTR_GLOBAL(logger_, "WebSocketTransportAdapter") + +WebSocketServerTransportAdapter::WebSocketServerTransportAdapter( + resumption::LastStateWrapperPtr last_state_wrapper, + const TransportManagerSettings& settings) + : TransportAdapterImpl(nullptr, + nullptr, + new WebSocketListener(this, settings), + last_state_wrapper, + settings) {} + +WebSocketServerTransportAdapter::~WebSocketServerTransportAdapter() {} + +void WebSocketServerTransportAdapter::TransportConfigUpdated( + const TransportConfig& new_config) { + LOG4CXX_AUTO_TRACE(logger_); + + transport_config_ = new_config; + + // call the method of parent class to trigger OnTransportConfigUpdated() for + // the listeners + TransportAdapterImpl::TransportConfigUpdated(new_config); +} + +TransportConfig WebSocketServerTransportAdapter::GetTransportConfiguration() + const { + LOG4CXX_AUTO_TRACE(logger_); + return transport_config_; +} + +DeviceType WebSocketServerTransportAdapter::GetDeviceType() const { + return WEBENGINE_WEBSOCKET; +} + +DeviceSptr WebSocketServerTransportAdapter::AddDevice(DeviceSptr device) { + LOG4CXX_AUTO_TRACE(logger_); + webengine_device_ = device; + Store(); + return TransportAdapterImpl::AddDevice(webengine_device_); +} + +TransportAdapter::Error WebSocketServerTransportAdapter::Init() { + LOG4CXX_AUTO_TRACE(logger_); + if (webengine_device_) { + AddDevice(webengine_device_); + } + return TransportAdapterImpl::Init(); +} + +void WebSocketServerTransportAdapter::Store() const { + LOG4CXX_AUTO_TRACE(logger_); + if (webengine_device_) { + Json::Value& dictionary = last_state().get_dictionary(); + if (dictionary["TransportManager"].isMember("WebsocketServerAdapter")) { + LOG4CXX_DEBUG( + logger_, "WebsocketServerAdapter already exists. Storing is skipped"); + return; + } + + Json::Value device_dictionary; + device_dictionary["unique_id"] = webengine_device_->unique_device_id(); + + Json::Value ws_adapter_dictionary; + ws_adapter_dictionary["device"] = device_dictionary; + dictionary["TransportManager"]["WebsocketServerAdapter"] = + ws_adapter_dictionary; + } +} + +bool WebSocketServerTransportAdapter::Restore() { + LOG4CXX_AUTO_TRACE(logger_); + const Json::Value& dictionary = last_state().get_dictionary(); + const Json::Value ws_adapter_dictionary = + dictionary["TransportManager"]["WebsocketServerAdapter"]; + webengine_device_id_ = + ws_adapter_dictionary["device"]["unique_id"].asString(); + if (webengine_device_id_.empty()) { + srand(time(0)); + const size_t device_id_length = 64u; + webengine_device_id_ = utils::gen_hash(device_id_length); + } + return true; +} + +std::string WebSocketServerTransportAdapter::GetStoredDeviceID() const { + LOG4CXX_AUTO_TRACE(logger_); + return webengine_device_id_; +} + +} // namespace transport_adapter +} // namespace transport_manager diff --git a/src/components/transport_manager/src/websocket_server/websocket_session.cc b/src/components/transport_manager/src/websocket_server/websocket_session.cc new file mode 100644 index 0000000000..ab62530963 --- /dev/null +++ b/src/components/transport_manager/src/websocket_server/websocket_session.cc @@ -0,0 +1,162 @@ +/* +Copyright (c) 2020 Livio, Inc. +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +* Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + +* Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + +* Neither the name of SmartDeviceLink Consortium, Inc. nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +*/ + +#include "transport_manager/websocket_server/websocket_session.h" +#include <unistd.h> +#include "transport_manager/transport_adapter/transport_adapter_controller.h" + +namespace transport_manager { +namespace transport_adapter { + +using namespace boost::beast::websocket; + +template <> +WebSocketSession<tcp::socket&>::WebSocketSession( + boost::asio::ip::tcp::socket socket, + DataReceiveCallback data_receive, + OnIOErrorCallback on_error) + : socket_(std::move(socket)) + , ws_(socket_) + , strand_(ws_.get_executor()) + , data_receive_(data_receive) + , on_io_error_(on_error) { + ws_.binary(true); +} + +#ifdef ENABLE_SECURITY +template <> +WebSocketSession<ssl::stream<tcp::socket&> >::WebSocketSession( + boost::asio::ip::tcp::socket socket, + ssl::context& ctx, + DataReceiveCallback data_receive, + OnIOErrorCallback on_error) + : socket_(std::move(socket)) + , ws_(socket_, ctx) + , strand_(ws_.get_executor()) + , data_receive_(data_receive) + , on_io_error_(on_error) { + ws_.binary(true); +} +template class WebSocketSession<ssl::stream<tcp::socket&> >; +#endif // ENABLE_SECURITY + +template <typename ExecutorType> +WebSocketSession<ExecutorType>::~WebSocketSession() {} + +template <typename ExecutorType> +void WebSocketSession<ExecutorType>::AsyncAccept() { + LOG4CXX_AUTO_TRACE(ws_logger_); + ws_.async_accept( + boost::asio::bind_executor(strand_, + std::bind(&WebSocketSession::AsyncRead, + this->shared_from_this(), + std::placeholders::_1))); +} + +template <typename ExecutorType> +void WebSocketSession<ExecutorType>::AsyncRead(boost::system::error_code ec) { + LOG4CXX_AUTO_TRACE(ws_logger_); + if (ec) { + auto str_err = "ErrorMessage: " + ec.message(); + LOG4CXX_ERROR(ws_logger_, str_err); + return; + } + + ws_.async_read(buffer_, + boost::asio::bind_executor(strand_, + std::bind(&WebSocketSession::Read, + this->shared_from_this(), + std::placeholders::_1, + std::placeholders::_2))); +} + +template <typename ExecutorType> +TransportAdapter::Error WebSocketSession<ExecutorType>::WriteDown( + ::protocol_handler::RawMessagePtr message) { + boost::system::error_code ec; + ws_.write(boost::asio::buffer(message->data(), message->data_size()), ec); + + if (ec) { + LOG4CXX_ERROR(ws_logger_, "A system error has occurred: " << ec.message()); + return TransportAdapter::FAIL; + } + + return TransportAdapter::OK; +} + +template <typename ExecutorType> +void WebSocketSession<ExecutorType>::Read(boost::system::error_code ec, + std::size_t bytes_transferred) { + LOG4CXX_AUTO_TRACE(ws_logger_); + boost::ignore_unused(bytes_transferred); + if (ec) { + LOG4CXX_ERROR(ws_logger_, ec.message()); + buffer_.consume(buffer_.size()); + on_io_error_(); + return; + } + + auto size = buffer_.size(); + const auto data = boost::asio::buffer_cast<const uint8_t*>( + boost::beast::buffers_front(buffer_.data())); + + LOG4CXX_DEBUG(ws_logger_, + "Msg: " << boost::beast::buffers_to_string(buffer_.data()) + << " Size: " << size;); + + ::protocol_handler::RawMessagePtr frame( + new protocol_handler::RawMessage(0, 0, data, size, false)); + + data_receive_(frame); + + buffer_.consume(buffer_.size()); + AsyncRead(ec); +} + +template <typename ExecutorType> +bool WebSocketSession<ExecutorType>::Shutdown() { + LOG4CXX_AUTO_TRACE(ws_logger_); + boost::system::error_code ec; + if (socket_.is_open()) { + socket_.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec); + socket_.close(); + } + buffer_.consume(buffer_.size()); + if (ec) { + LOG4CXX_ERROR(ws_logger_, ec.message()); + return false; + } + return true; +} + +template class WebSocketSession<tcp::socket&>; + +} // namespace transport_adapter +} // namespace transport_manager |