diff options
Diffstat (limited to 'SDL_Core/src/components/transport_manager/src/transport_manager_impl.cc')
-rw-r--r-- | SDL_Core/src/components/transport_manager/src/transport_manager_impl.cc | 966 |
1 files changed, 0 insertions, 966 deletions
diff --git a/SDL_Core/src/components/transport_manager/src/transport_manager_impl.cc b/SDL_Core/src/components/transport_manager/src/transport_manager_impl.cc deleted file mode 100644 index 62a970ccf..000000000 --- a/SDL_Core/src/components/transport_manager/src/transport_manager_impl.cc +++ /dev/null @@ -1,966 +0,0 @@ -/** - * \file transport_manager_impl.cc - * \brief TransportManagerImpl class source file. - * - * Copyright (c) 2013, Ford Motor Company - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * Redistributions of source code must retain the above copyright notice, this - * list of conditions and the following disclaimer. - * - * Redistributions in binary form must reproduce the above copyright notice, - * this list of conditions and the following - * disclaimer in the documentation and/or other materials provided with the - * distribution. - * - * Neither the name of the Ford Motor Company nor the names of its contributors - * may be used to endorse or promote products derived from this software - * without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - */ - -#include <pthread.h> -#include <stdint.h> -#include <cstring> -#include <queue> -#include <set> -#include <algorithm> -#include <limits> -#include <functional> -#include <sstream> -#include "utils/macro.h" -#include "protocol_handler/raw_message.h" -#include "protocol_handler/protocol_packet.h" -#include "transport_manager/transport_manager_impl.h" -#include "transport_manager/transport_manager_listener.h" -#include "transport_manager/transport_manager_listener_empty.h" -#include "transport_manager/bluetooth/bluetooth_transport_adapter.h" -#include "transport_manager/tcp/tcp_transport_adapter.h" -#include "transport_manager/transport_adapter/transport_adapter.h" -#include "config_profile/profile.h" -#include "transport_manager/transport_adapter/transport_adapter_event.h" - -using ::transport_manager::transport_adapter::TransportAdapter; - -namespace transport_manager { - -log4cxx::LoggerPtr TransportManagerImpl::logger_ = - log4cxx::LoggerPtr(log4cxx::Logger::getLogger("TransportManager")); - -TransportManagerImpl::Connection TransportManagerImpl::convert(TransportManagerImpl::ConnectionInternal& p) { - TransportManagerImpl::Connection c; - c.application = p.application; - c.device = p.device; - c.id = p.id; - return c; -} - -class TransportManagerImpl::IncomingDataHandler { - public: - IncomingDataHandler(TransportManagerImpl* tm_impl) - : connections_data_(), tm_impl_(tm_impl) {} - - bool ProcessData(ConnectionUID connection_id, uint8_t* data, - std::size_t size) { - LOG4CXX_TRACE(logger_, "Start of processing incoming data of size " - << size << " for connection " << connection_id); - const uint32_t kBytesForSizeDetection = 8; - ConnectionsData::iterator it = connections_data_.find(connection_id); - if (connections_data_.end() == it) { - LOG4CXX_ERROR(logger_, "ProcessData requested for unknown connection"); - return false; - } - std::vector<uint8_t>& connection_data = it->second; - connection_data.insert(connection_data.end(), data, data + size); - - LOG4CXX_TRACE(logger_, "Total data size for connection " - << connection_id << " is " - << connection_data.size()); - while (connection_data.size() >= kBytesForSizeDetection) { - const uint32_t packet_size = - tm_impl_->GetPacketSize(kBytesForSizeDetection, &connection_data[0]); - if (0 == packet_size) { - LOG4CXX_ERROR(logger_, "Failed to get packet size"); - return false; - } - LOG4CXX_TRACE(logger_, "Packet size " << packet_size); - if (connection_data.size() >= packet_size) { - RawMessageSptr raw_message(new protocol_handler::RawMessage( - connection_id, 0, // It's not up to TM to know protocol version - &connection_data[0], packet_size)); - tm_impl_->RaiseEvent(&TransportManagerListener::OnTMMessageReceived, - raw_message); - connection_data.erase(connection_data.begin(), - connection_data.begin() + packet_size); - LOG4CXX_TRACE(logger_, - "Packet created and passed, new data size for connection " - << connection_id << " is " << connection_data.size()); - } else { - LOG4CXX_TRACE(logger_, "Packet data is not available yet"); - return true; - } - } - return true; - } - - void AddConnection(ConnectionUID connection_id) { - connections_data_[connection_id]; - } - - void RemoveConnection(ConnectionUID connection_id) { - connections_data_.erase(connection_id); - } - - private: - typedef std::map<ConnectionUID, std::vector<uint8_t> > ConnectionsData; - ConnectionsData connections_data_; - TransportManagerImpl* tm_impl_; -}; - -TransportManagerImpl::TransportManagerImpl() - : message_queue_mutex_(), - all_thread_active_(false), - message_queue_thread_(), - event_queue_thread_(), - device_listener_thread_wakeup_(), - is_initialized_(false), - connection_id_counter_(0), - incoming_data_handler_(new IncomingDataHandler(this)) { - LOG4CXX_INFO(logger_, "=============================================="); -#ifdef USE_RWLOCK - pthread_rwlock_init(&message_queue_rwlock_, NULL); - pthread_rwlock_init(&event_queue_rwlock_, NULL); -#endif - pthread_mutex_init(&message_queue_mutex_, NULL); - pthread_cond_init(&message_queue_cond_, NULL); - pthread_mutex_init(&event_queue_mutex_, 0); - pthread_cond_init(&device_listener_thread_wakeup_, NULL); - LOG4CXX_INFO(logger_, "TransportManager object created"); -} - -TransportManagerImpl::~TransportManagerImpl() { - LOG4CXX_INFO(logger_, "TransportManager object destroyed"); - - for (std::vector<TransportAdapter*>::iterator it = - transport_adapters_.begin(); - it != transport_adapters_.end(); ++it) { - delete *it; - } - - for (std::map<TransportAdapter*, TransportAdapterListenerImpl*>::iterator it = - transport_adapter_listeners_.begin(); - it != transport_adapter_listeners_.end(); ++it) { - delete it->second; - } - - pthread_mutex_destroy(&message_queue_mutex_); - pthread_cond_destroy(&message_queue_cond_); - pthread_mutex_destroy(&event_queue_mutex_); - pthread_cond_destroy(&device_listener_thread_wakeup_); -} - -int TransportManagerImpl::ConnectDevice(const DeviceHandle& device_handle) { - LOG4CXX_INFO(logger_, "Connect device called with arguments device_handle " - << device_handle); - if (!this->is_initialized_) { - LOG4CXX_ERROR(logger_, "TransportManager is not initialized."); - return E_TM_IS_NOT_INITIALIZED; - } - - DeviceUID device_id = converter_.HandleToUid(device_handle); - LOG4CXX_INFO(logger_, "Convert handle to id " << device_id); - - transport_adapter::TransportAdapter* ta = device_to_adapter_map_[device_id]; - if (NULL == ta) { - LOG4CXX_ERROR(logger_, "No device adapter found by id " << device_id); - return E_INVALID_HANDLE; - } - - TransportAdapter::Error ta_error = ta->ConnectDevice(device_id); - return (TransportAdapter::OK == ta_error) ? E_SUCCESS : E_INTERNAL_ERROR; -} - -int TransportManagerImpl::DisconnectDevice(const DeviceHandle& device_handle) { - LOG4CXX_INFO(logger_, "Disconnect device called"); - if (!this->is_initialized_) { - LOG4CXX_ERROR(logger_, "TransportManager is not initialized."); - return E_TM_IS_NOT_INITIALIZED; - } - DeviceUID device_id = converter_.HandleToUid(device_handle); - LOG4CXX_INFO(logger_, "Convert handle to id" << device_id); - - transport_adapter::TransportAdapter* ta = device_to_adapter_map_[device_id]; - if (NULL == ta) { - LOG4CXX_ERROR(logger_, "No device adapter found by id " << device_id); - return E_INVALID_HANDLE; - } - ta->DisconnectDevice(device_id); - LOG4CXX_INFO(logger_, "Disconnected"); - return E_SUCCESS; -} - -int TransportManagerImpl::Disconnect(const ConnectionUID& cid) { - if (!this->is_initialized_) { - LOG4CXX_ERROR(logger_, "TransportManager is not initialized."); - return E_TM_IS_NOT_INITIALIZED; - } - - ConnectionInternal* connection = GetConnection(cid); - if (connection == NULL) { - LOG4CXX_ERROR( - logger_, - "TransportManagerImpl::Disconnect: Connection does not exist."); - return E_INVALID_HANDLE; - } - - pthread_mutex_lock(&event_queue_mutex_); - int messages_count = 0; - for (EventQueue::const_iterator it = event_queue_.begin(); - it != event_queue_.end(); - ++it) { - if (it->application_id() == cid) { - ++messages_count; - } - } - pthread_mutex_unlock(&event_queue_mutex_); - - if (messages_count > 0) { - connection->messages_count = messages_count; - connection->shutDown = true; - connection->timer->start( - profile::Profile::instance()->transport_manager_disconnect_timeout() - ); - } else { - connection->transport_adapter->Disconnect(connection->device, - connection->application); - } - return E_SUCCESS; -} - -int TransportManagerImpl::DisconnectForce(const ConnectionUID& cid) { - if (false == this->is_initialized_) { - LOG4CXX_ERROR(logger_, "TransportManager is not initialized."); - return E_TM_IS_NOT_INITIALIZED; - } - pthread_mutex_lock(&event_queue_mutex_); - // Clear messages for this connection - // Note that MessageQueue typedef is assumed to be std::list, - // or there is a problem here. One more point versus typedefs-everywhere - MessageQueue::iterator e = message_queue_.begin(); - while (e != message_queue_.end()) { - if ((*e)->connection_key() == cid) { - RaiseEvent(&TransportManagerListener::OnTMMessageSendFailed, - DataSendTimeoutError(), *e); - e = message_queue_.erase(e); - } else { - ++e; - } - } - pthread_mutex_unlock(&event_queue_mutex_); - const ConnectionInternal* connection = GetConnection(cid); - if (connection == NULL) { - LOG4CXX_ERROR( - logger_, - "TransportManagerImpl::DisconnectForce: Connection does not exist."); - return E_INVALID_HANDLE; - } - connection->transport_adapter->Disconnect(connection->device, - connection->application); - return E_SUCCESS; -} - -int TransportManagerImpl::AddEventListener(TransportManagerListener* listener) { - transport_manager_listener_.push_back(listener); - return E_SUCCESS; -} - -int TransportManagerImpl::Stop() { - if (!all_thread_active_) return E_TM_IS_NOT_INITIALIZED; - - all_thread_active_ = false; - - pthread_mutex_lock(&event_queue_mutex_); - pthread_cond_signal(&device_listener_thread_wakeup_); - pthread_mutex_unlock(&event_queue_mutex_); - - pthread_mutex_lock(&message_queue_mutex_); - pthread_cond_signal(&message_queue_cond_); - pthread_mutex_unlock(&message_queue_mutex_); - - pthread_join(message_queue_thread_, 0); - pthread_join(event_queue_thread_, 0); - - LOG4CXX_INFO(logger_, "TransportManager object stopped"); - - return E_SUCCESS; -} - -int TransportManagerImpl::SendMessageToDevice(const RawMessageSptr message) { - LOG4CXX_INFO(logger_, "Send message to device called with arguments " - << message.get()); - if (false == this->is_initialized_) { - LOG4CXX_ERROR(logger_, "TM is not initialized."); - return E_TM_IS_NOT_INITIALIZED; - } - - const ConnectionInternal* connection = - GetConnection(message->connection_key()); - if (connection == NULL) { - LOG4CXX_ERROR(logger_, "Connection with id " << message->connection_key() - << " does not exist."); - return E_INVALID_HANDLE; - } - - if (connection->shutDown) { - LOG4CXX_ERROR( - logger_, - "TransportManagerImpl::Disconnect: Connection is to shut down."); - return E_CONNECTION_IS_TO_SHUTDOWN; - } - - this->PostMessage(message); - LOG4CXX_INFO(logger_, "Message posted"); - return E_SUCCESS; -} - -int TransportManagerImpl::ReceiveEventFromDevice( - const TransportAdapterEvent& event) { - if (false == this->is_initialized_) { - LOG4CXX_ERROR(logger_, "TM is not initialized."); - return E_TM_IS_NOT_INITIALIZED; - } - this->PostEvent(event); - return E_SUCCESS; -} - -int TransportManagerImpl::RemoveDevice(const DeviceHandle& device_handle) { - DeviceUID device_id = converter_.HandleToUid(device_handle); - if (false == this->is_initialized_) { - LOG4CXX_ERROR(logger_, "TM is not initialized."); - return E_TM_IS_NOT_INITIALIZED; - } - device_to_adapter_map_.erase(device_id); - return E_SUCCESS; -} - -int TransportManagerImpl::AddTransportAdapter( - transport_adapter::TransportAdapter* transport_adapter) { - LOG4CXX_INFO(logger_, "Add device adapter " - << transport_adapter << "[" - << transport_adapter->GetDeviceType() << "]"); - - if (transport_adapter_listeners_.find(transport_adapter) != - transport_adapter_listeners_.end()) { - LOG4CXX_ERROR(logger_, "Adapter already exists."); - return E_ADAPTER_EXISTS; - } - transport_adapter_listeners_[transport_adapter] = - new TransportAdapterListenerImpl(this, transport_adapter); - transport_adapter->AddListener( - transport_adapter_listeners_[transport_adapter]); - - if (transport_adapter->IsInitialised() || - transport_adapter->Init() == TransportAdapter::OK) { - transport_adapters_.push_back(transport_adapter); - } - - return E_SUCCESS; -} - -int TransportManagerImpl::SearchDevices(void) { - if (!this->is_initialized_) { - LOG4CXX_ERROR(logger_, "TM is not initialized"); - return E_TM_IS_NOT_INITIALIZED; - } - - LOG4CXX_INFO(logger_, "Search device called"); - - bool success_occured = false; - - for (std::vector<TransportAdapter*>::iterator it = - transport_adapters_.begin(); - it != transport_adapters_.end(); ++it) { - LOG4CXX_INFO(logger_, "Iterating over transport adapters"); - TransportAdapter::Error scanResult = (*it)->SearchDevices(); - if (transport_adapter::TransportAdapter::OK == scanResult) { - success_occured = true; - } - else { - LOG4CXX_ERROR(logger_, "Transport Adapter search failed " - << *it << "[" << (*it)->GetDeviceType() - << "]"); - switch (scanResult) { - case transport_adapter::TransportAdapter::NOT_SUPPORTED: { - LOG4CXX_ERROR(logger_, "Search feature is not supported " - << *it << "[" << (*it)->GetDeviceType() - << "]"); - break; - } - case transport_adapter::TransportAdapter::BAD_STATE: { - LOG4CXX_ERROR(logger_, "Transport Adapter has bad state " - << *it << "[" << (*it)->GetDeviceType() - << "]"); - break; - } - } - } - } - - LOG4CXX_INFO(logger_, "SearchDevices() function is complete"); - - return (success_occured || transport_adapters_.empty()) - ? E_SUCCESS - : E_ADAPTERS_FAIL; -} - -int TransportManagerImpl::Init(void) { - LOG4CXX_INFO(logger_, "Init is called"); - all_thread_active_ = true; - - int error_code = - pthread_create(&message_queue_thread_, 0, &MessageQueueStartThread, this); - - if (0 != error_code) { - LOG4CXX_ERROR(logger_, - "Message queue thread is not created exit with error code " - << error_code); - return E_TM_IS_NOT_INITIALIZED; - } - - error_code = - pthread_create(&event_queue_thread_, 0, &EventListenerStartThread, this); - - if (0 != error_code) { - LOG4CXX_ERROR(logger_, - "Event queue thread is not created exit with error code " - << error_code); - return E_TM_IS_NOT_INITIALIZED; - } - - is_initialized_ = true; - LOG4CXX_INFO(logger_, "Init complete"); - return E_SUCCESS; -} - -int TransportManagerImpl::Visibility(const bool& on_off) const { - TransportAdapter::Error ret; - - LOG4CXX_INFO(logger_, "Visibility change requested to " << on_off); - if (false == this->is_initialized_) { - LOG4CXX_ERROR(logger_, "TM is not initialized"); - return E_TM_IS_NOT_INITIALIZED; - } - - for (std::vector<TransportAdapter*>::const_iterator it = - transport_adapters_.begin(); - it != transport_adapters_.end(); ++it) { - if (on_off) { - ret = (*it)->StartClientListening(); - } else { - ret = (*it)->StopClientListening(); - } - if (TransportAdapter::Error::NOT_SUPPORTED == ret) { - LOG4CXX_INFO(logger_, "Visibility change is not supported for adapter " - << *it << "[" << (*it)->GetDeviceType() << "]"); - } - } - - LOG4CXX_INFO(logger_, "Visibility change requested complete"); - return E_SUCCESS; -} - -void TransportManagerImpl::UpdateDeviceList(TransportAdapter* ta) { - std::set<DeviceInfo> old_devices; - for (DeviceList::iterator it = device_list_.begin(); - it != device_list_.end();) { - if (it->first == ta) { - old_devices.insert(it->second); - it = device_list_.erase(it); - } else { - ++it; - } - } - - std::set<DeviceInfo> new_devices; - const transport_adapter::DeviceList dev_list = ta->GetDeviceList(); - for (transport_adapter::DeviceList::const_iterator it = dev_list.begin(); - it != dev_list.end(); ++it) { - DeviceHandle device_handle = converter_.UidToHandle(*it); - DeviceInfo info(device_handle, *it, ta->DeviceName(*it)); - device_list_.push_back(std::make_pair(ta, info)); - new_devices.insert(info); - } - - std::set<DeviceInfo> added_devices; - std::set_difference(new_devices.begin(), new_devices.end(), - old_devices.begin(), old_devices.end(), - std::inserter(added_devices, added_devices.begin())); - for(std::set<DeviceInfo>::const_iterator it = added_devices.begin(); - it != added_devices.end(); - ++it) { - RaiseEvent(&TransportManagerListener::OnDeviceAdded, *it); - } - - std::set<DeviceInfo> removed_devices; - std::set_difference(old_devices.begin(), old_devices.end(), - new_devices.begin(), new_devices.end(), - std::inserter(removed_devices, removed_devices.begin())); - - for(std::set<DeviceInfo>::const_iterator it = removed_devices.begin(); - it != removed_devices.end(); - ++it) { - RaiseEvent(&TransportManagerListener::OnDeviceRemoved, *it); - } -} - -void TransportManagerImpl::PostMessage(const RawMessageSptr message) { - LOG4CXX_INFO(logger_, "Post message called serial number " << message.get()); - -#ifdef USE_RWLOCK - pthread_rwlock_wrlock(&message_queue_rwlock_); -#else - pthread_mutex_lock(&message_queue_mutex_); -#endif - message_queue_.push_back(message); - pthread_cond_signal(&message_queue_cond_); -#ifdef USE_RWLOCK - pthread_rwlock_unlock(&message_queue_rwlock_); -#else - pthread_mutex_unlock(&message_queue_mutex_); -#endif - LOG4CXX_INFO(logger_, "Post message complete"); -} - -void TransportManagerImpl::RemoveMessage(const RawMessageSptr message) { - // TODO: Reconsider necessity of this method, remove if it's useless, - // make to work otherwise. - // 2013-08-21 dchmerev@luxoft.com - LOG4CXX_INFO(logger_, "Remove message called " << message.get()); -#ifdef USE_RWLOCK - pthread_rwlock_wrlock(&message_queue_rwlock_); -#else - pthread_mutex_lock(&message_queue_mutex_); -#endif - std::remove(message_queue_.begin(), message_queue_.end(), message); -#ifdef USE_RWLOCK - pthread_rwlock_unlock(&message_queue_rwlock_); -#else - pthread_mutex_unlock(&message_queue_mutex_); -#endif - LOG4CXX_INFO(logger_, "Remove message from queue complete"); -} - -void TransportManagerImpl::PostEvent(const TransportAdapterEvent& event) { -#ifdef USE_RWLOCK - pthread_rwlock_wrlock(&event_queue_rwlock_); -#else - pthread_mutex_lock(&event_queue_mutex_); -#endif - event_queue_.push_back(event); - pthread_cond_signal(&device_listener_thread_wakeup_); -#ifdef USE_RWLOCK - pthread_rwlock_unlock(&event_queue_rwlock_); -#else - pthread_mutex_unlock(&event_queue_mutex_); -#endif -} - -void* TransportManagerImpl::EventListenerStartThread(void* data) { - if (NULL != data) { - static_cast<TransportManagerImpl*>(data)->EventListenerThread(); - } - return 0; -} - -void TransportManagerImpl::AddConnection(const ConnectionInternal& c) { - connections_.push_back(c); - incoming_data_handler_->AddConnection(c.id); -} - -void TransportManagerImpl::RemoveConnection(int id) { - for (std::vector<ConnectionInternal>::iterator it = connections_.begin(); - it != connections_.end(); ++it) { - if (it->id == id) { - connections_.erase(it); - break; - } - } - incoming_data_handler_->RemoveConnection(id); -} - -TransportManagerImpl::ConnectionInternal* TransportManagerImpl::GetConnection( - const ConnectionUID& id) { - for (std::vector<ConnectionInternal>::iterator it = connections_.begin(); it != connections_.end(); ++it) { - if (it->id == id) { - return &*it; - } - } - return NULL; -} - -TransportManagerImpl::ConnectionInternal* TransportManagerImpl::GetConnection( - const DeviceUID& device, const ApplicationHandle& application) { - for (std::vector<ConnectionInternal>::iterator it = connections_.begin(); it != connections_.end(); ++it) { - if (it->device == device && it->application == application) { - return &*it; - } - } - return NULL; -} - -// TODO this function should be moved outside of TM to protocol handler or -// somewhere else -unsigned int TransportManagerImpl::GetPacketSize(unsigned int data_size, - unsigned char* first_bytes) { - DCHECK(first_bytes); - unsigned char offset = sizeof(uint32_t); - if (data_size < 2 * offset) { - LOG4CXX_ERROR(logger_, "Received bytes are not enough to parse fram size."); - return 0; - } - - unsigned char* received_bytes = first_bytes; - DCHECK(received_bytes); - - unsigned char version = received_bytes[0] >> 4u; - uint32_t frame_body_size = received_bytes[offset++] << 24u; - frame_body_size |= received_bytes[offset++] << 16u; - frame_body_size |= received_bytes[offset++] << 8u; - frame_body_size |= received_bytes[offset++]; - - unsigned int required_size = frame_body_size; - switch (version) { - case protocol_handler::PROTOCOL_VERSION_1: - required_size += protocol_handler::PROTOCOL_HEADER_V1_SIZE; - break; - case protocol_handler::PROTOCOL_VERSION_2: - required_size += protocol_handler::PROTOCOL_HEADER_V2_SIZE; - break; - default: - LOG4CXX_ERROR(logger_, "Unknown protocol version."); - return 0; - } - - return required_size; -} - -void TransportManagerImpl::OnDeviceListUpdated(TransportAdapter* ta) { - const transport_adapter::DeviceList device_list = ta->GetDeviceList(); - LOG4CXX_INFO(logger_, "DEVICE_LIST_UPDATED " << device_list.size()); - for (transport_adapter::DeviceList::const_iterator it = device_list.begin(); - it != device_list.end(); ++it) { - device_to_adapter_map_.insert(std::make_pair(*it, ta)); - DeviceHandle device_handle = converter_.UidToHandle(*it); - DeviceInfo info(device_handle, *it, ta->DeviceName(*it)); - RaiseEvent(&TransportManagerListener::OnDeviceFound, info); - } - UpdateDeviceList(ta); - std::vector<DeviceInfo> device_infos; - for (DeviceList::const_iterator it = device_list_.begin(); - it != device_list_.end(); ++it) { - device_infos.push_back(it->second); - } - RaiseEvent(&TransportManagerListener::OnDeviceListUpdated, device_infos); -} - -void TransportManagerImpl::EventListenerThread(void) { -#ifndef USE_RWLOCK - pthread_mutex_lock(&event_queue_mutex_); -#endif - LOG4CXX_INFO(logger_, "Event listener thread started"); - while (true) { - while (event_queue_.size() > 0) { -#ifdef USE_RWLOCK - pthread_rwlock_rdlock(&event_queue_rwlock_); -#endif - LOG4CXX_INFO(logger_, "Event listener queue pushed to process events"); - EventQueue::iterator current = event_queue_.begin(); - TransportAdapter* ta = current->transport_adapter(); - ApplicationHandle app_handle = current->application_id(); - DeviceUID device_id = current->device_uid(); - DeviceHandle device_handle; - BaseError* error = current->event_error(); - RawMessageSptr data = current->data(); - - int event_type = current->event_type(); - event_queue_.erase(current); -#ifdef USE_RWLOCK - pthread_rwlock_unlock(&event_queue_rwlock_); -#else - pthread_mutex_unlock(&event_queue_mutex_); -#endif - transport_adapter::DeviceList dev_list; - ConnectionInternal* connection = GetConnection(device_id, app_handle); - std::vector<DeviceInfo>::iterator device_info_iterator; - - switch (event_type) { - case TransportAdapterListenerImpl::EventTypeEnum::ON_SEARCH_DONE: { - LOG4CXX_INFO(logger_, "Event ON_SEARCH_DONE"); - RaiseEvent(&TransportManagerListener::OnScanDevicesFinished); - break; - } - case TransportAdapterListenerImpl::EventTypeEnum::ON_SEARCH_FAIL: { - LOG4CXX_INFO(logger_, "Event ON_SEARCH_FAIL"); - // error happened in real search process (external error) - RaiseEvent(&TransportManagerListener::OnScanDevicesFailed, - *static_cast<SearchDeviceError*>(error)); - break; - } - case TransportAdapterListenerImpl::EventTypeEnum::ON_DEVICE_LIST_UPDATED - : { - OnDeviceListUpdated(ta); - break; - } - case TransportAdapterListenerImpl::EventTypeEnum::ON_CONNECT_DONE: { - LOG4CXX_INFO(logger_, "Event ON_CONNECT_DONE"); - AddConnection(ConnectionInternal(this, ta, ++connection_id_counter_, - device_id, app_handle)); - device_handle = converter_.UidToHandle(device_id); - RaiseEvent( - &TransportManagerListener::OnConnectionEstablished, - DeviceInfo(device_handle, device_id, ta->DeviceName(device_id)), - connection_id_counter_); - break; - } - case TransportAdapterListenerImpl::EventTypeEnum::ON_CONNECT_FAIL: { - LOG4CXX_INFO(logger_, "Event ON_CONNECT_FAIL"); - RaiseEvent(&TransportManagerListener::OnConnectionFailed, - DeviceInfo(converter_.UidToHandle(device_id), device_id, - ta->DeviceName(device_id)), - ConnectError()); - break; - } - case TransportAdapterListenerImpl::EventTypeEnum::ON_DISCONNECT_DONE: { - LOG4CXX_INFO(logger_, "Event ON_DISCONNECT_DONE"); - if (connection == NULL) { - LOG4CXX_ERROR(logger_, "Connection ('" << device_id << ", " - << app_handle - << ") not found"); - break; - } - RaiseEvent(&TransportManagerListener::OnConnectionClosed, - connection->id); - RemoveConnection(connection->id); - break; - } - case TransportAdapterListenerImpl::EventTypeEnum::ON_DISCONNECT_FAIL: { - LOG4CXX_INFO(logger_, "Event ON_DISCONNECT_FAIL"); - device_handle = converter_.UidToHandle(device_id); - RaiseEvent(&TransportManagerListener::OnDisconnectFailed, - device_handle, DisconnectDeviceError()); - break; - } - case TransportAdapterListenerImpl::EventTypeEnum::ON_SEND_DONE: { - LOG4CXX_INFO(logger_, "Event ON_SEND_DONE"); - if (connection == NULL) { - LOG4CXX_ERROR(logger_, "Connection ('" << device_id << ", " - << app_handle - << ") not found"); - break; - } - RaiseEvent(&TransportManagerListener::OnTMMessageSend, data); - this->RemoveMessage(data); - if (connection->shutDown && --connection->messages_count == 0) { - connection->timer->stop(); - connection->transport_adapter->Disconnect(connection->device, - connection->application); - } - break; - } - case TransportAdapterListenerImpl::EventTypeEnum::ON_SEND_FAIL: { - LOG4CXX_INFO(logger_, "Event ON_SEND_FAIL"); - if (connection == NULL) { - LOG4CXX_ERROR(logger_, "Connection ('" << device_id << ", " - << app_handle - << ") not found"); - break; - } - - // TODO(YK): start timer here to wait before notify caller - // and remove unsent messages - LOG4CXX_ERROR(logger_, "Transport adapter failed to send data"); - // TODO(YK): potential error case -> thread unsafe - // update of message content - data->set_waiting(true); - if (data.valid()) { - data->set_waiting(true); - } else { - LOG4CXX_ERROR(logger_, "Data is invalid"); - } - break; - } - case TransportAdapterListenerImpl::EventTypeEnum::ON_RECEIVED_DONE: { - LOG4CXX_INFO(logger_, "Event ON_RECEIVED_DONE"); - if (connection == NULL) { - LOG4CXX_ERROR(logger_, "Connection ('" << device_id << ", " - << app_handle - << ") not found"); - break; - } - const bool ok = incoming_data_handler_->ProcessData( - connection->id, data->data(), data->data_size()); - if (!ok) { - LOG4CXX_ERROR( - logger_, - "Incoming data processing failed. Terminating connection."); - DisconnectForce(connection->id); - } - break; - } - case TransportAdapterListenerImpl::EventTypeEnum::ON_RECEIVED_FAIL: { - LOG4CXX_INFO(logger_, "Event ON_RECEIVED_FAIL"); - if (connection == NULL) { - LOG4CXX_ERROR(logger_, "Connection ('" << device_id << ", " - << app_handle - << ") not found"); - break; - } - - RaiseEvent(&TransportManagerListener::OnTMMessageReceiveFailed, - connection->id, *static_cast<DataReceiveError*>(error)); - break; - } - case TransportAdapterListenerImpl::EventTypeEnum::ON_COMMUNICATION_ERROR - : { - LOG4CXX_INFO(logger_, "Event ON_COMMUNICATION_ERROR"); - break; - } - case TransportAdapterListenerImpl::EventTypeEnum:: - ON_UNEXPECTED_DISCONNECT: { - LOG4CXX_INFO(logger_, "Event ON_UNEXPECTED_DISCONNECT"); - if (connection) { - RaiseEvent(&TransportManagerListener::OnUnexpectedDisconnect, - connection->id, - *static_cast<CommunicationError*>(error)); - } else { - LOG4CXX_ERROR(logger_, "Connection ('" << device_id << ", " - << app_handle - << ") not found"); - } - break; - } - } // switch - delete error; -#ifndef USE_RWLOCK - pthread_mutex_lock(&event_queue_mutex_); -#endif - } // while (event_queue_.size() > 0) - - if (all_thread_active_) - pthread_cond_wait(&device_listener_thread_wakeup_, &event_queue_mutex_); - else - break; - } // while (true) - - pthread_mutex_unlock(&event_queue_mutex_); - - LOG4CXX_INFO(logger_, "Event listener thread finished"); -} -void* TransportManagerImpl::MessageQueueStartThread(void* data) { - if (NULL != data) { - static_cast<TransportManagerImpl*>(data)->MessageQueueThread(); - } - return 0; -} - -void TransportManagerImpl::MessageQueueThread(void) { - LOG4CXX_INFO(logger_, "Message queue thread started"); - -#ifndef USE_RWLOCK - pthread_mutex_lock(&message_queue_mutex_); -#endif - - while (all_thread_active_) { - // TODO(YK): add priority processing - - while (message_queue_.size() > 0) { -#ifdef USE_RWLOCK - pthread_rwlock_rdlock(&message_queue_rwlock_); -#endif - MessageQueue::iterator it = message_queue_.begin(); - while (it != message_queue_.end() && it->valid() && (*it)->IsWaiting()) { - ++it; - } - if (it == message_queue_.end()) { -#ifdef USE_RWLOCK - pthread_rwlock_unlock(&message_queue_rwlock_); -#endif - break; - } - RawMessageSptr active_msg = *it; -#ifdef USE_RWLOCK - pthread_rwlock_unlock(&message_queue_rwlock_); -#else - pthread_mutex_unlock(&message_queue_mutex_); -#endif - if (active_msg.valid() && !active_msg->IsWaiting()) { - ConnectionInternal* connection = - GetConnection(active_msg->connection_key()); - if (connection == NULL) { - std::stringstream ss; - ss << "Connection " << active_msg->connection_key() << " not found"; - LOG4CXX_ERROR(logger_, ss.str()); - RaiseEvent(&TransportManagerListener::OnTMMessageSendFailed, - DataSendError(ss.str()), active_msg); - message_queue_.remove(active_msg); - continue; - } - TransportAdapter* transport_adapter = connection->transport_adapter; - LOG4CXX_INFO(logger_, "Got adapter " - << transport_adapter << "[" - << transport_adapter->GetDeviceType() << "]" - << " by session id " - << active_msg->connection_key()); - - if (NULL == transport_adapter) { - std::string error_text = - "Transport adapter is not found - message removed"; - LOG4CXX_ERROR(logger_, error_text); - RaiseEvent(&TransportManagerListener::OnTMMessageSendFailed, - DataSendError(error_text), active_msg); - message_queue_.remove(active_msg); - } else { - if (TransportAdapter::OK == - transport_adapter->SendData( - connection->device, connection->application, active_msg)) { - LOG4CXX_INFO(logger_, "Data sent to adapter"); - active_msg->set_waiting(true); - } else { - LOG4CXX_ERROR(logger_, "Data sent error"); - RaiseEvent(&TransportManagerListener::OnTMMessageSendFailed, - DataSendError("Send failed - message removed"), - active_msg); - message_queue_.remove(active_msg); - } - } - } -#ifndef USE_RWLOCK - pthread_mutex_lock(&message_queue_mutex_); -#endif - } - pthread_cond_wait(&message_queue_cond_, &message_queue_mutex_); - } // while(true) - - message_queue_.clear(); - - pthread_mutex_unlock(&message_queue_mutex_); - LOG4CXX_INFO(logger_, "Message queue thread finished"); -} - -} // namespace transport_manager - - |