diff options
Diffstat (limited to 'src/CommonAPI')
-rw-r--r-- | src/CommonAPI/DBus/DBusAddressTranslator.cpp | 39 | ||||
-rw-r--r-- | src/CommonAPI/DBus/DBusConnection.cpp | 235 | ||||
-rw-r--r-- | src/CommonAPI/DBus/DBusDaemonProxy.cpp | 10 | ||||
-rw-r--r-- | src/CommonAPI/DBus/DBusFactory.cpp | 206 | ||||
-rw-r--r-- | src/CommonAPI/DBus/DBusInstanceAvailabilityStatusChangedEvent.cpp | 269 | ||||
-rw-r--r-- | src/CommonAPI/DBus/DBusMainLoopContext.cpp | 117 | ||||
-rw-r--r-- | src/CommonAPI/DBus/DBusOutputStream.cpp | 9 | ||||
-rw-r--r-- | src/CommonAPI/DBus/DBusProxy.cpp | 98 | ||||
-rw-r--r-- | src/CommonAPI/DBus/DBusProxyManager.cpp | 142 | ||||
-rw-r--r-- | src/CommonAPI/DBus/DBusServiceRegistry.cpp | 599 | ||||
-rw-r--r-- | src/CommonAPI/DBus/DBusStubAdapter.cpp | 3 |
11 files changed, 886 insertions, 841 deletions
diff --git a/src/CommonAPI/DBus/DBusAddressTranslator.cpp b/src/CommonAPI/DBus/DBusAddressTranslator.cpp index cb208c0..8f67aa2 100644 --- a/src/CommonAPI/DBus/DBusAddressTranslator.cpp +++ b/src/CommonAPI/DBus/DBusAddressTranslator.cpp @@ -67,9 +67,10 @@ DBusAddressTranslator::translate(const CommonAPI::Address &_key, DBusAddress &_v _value = it->second; } else if (isDefault_) { std::string interfaceName(_key.getInterface()); + std::replace(interfaceName.begin(), interfaceName.end(), ':', '.'); std::string objectPath("/" + _key.getInstance()); std::replace(objectPath.begin(), objectPath.end(), '.', '/'); - std::string service(_key.getInterface() + "_" + _key.getInstance()); + std::string service(interfaceName + "_" + _key.getInstance()); if (isValid(service, '.', false, false, true) && isValid(objectPath, '/', true) @@ -81,6 +82,11 @@ DBusAddressTranslator::translate(const CommonAPI::Address &_key, DBusAddress &_v forwards_.insert({ _key, _value }); backwards_.insert({ _value, _key }); } + else { + COMMONAPI_ERROR( + "Translation from CommonAPI address to DBus address failed!"); + result = false; + } } else { result = false; } @@ -101,6 +107,9 @@ DBusAddressTranslator::translate(const DBusAddress &_key, std::string &_value) { bool DBusAddressTranslator::translate(const DBusAddress &_key, CommonAPI::Address &_value) { bool result(true); + std::size_t itsInterfacePos; + std::string itsVersion; + bool isValidVersion(true); std::lock_guard<std::mutex> itsLock(mutex_); const auto it = backwards_.find(_key); @@ -109,6 +118,34 @@ DBusAddressTranslator::translate(const DBusAddress &_key, CommonAPI::Address &_v } else if (isDefault_) { if (isValid(_key.getObjectPath(), '/', true) && isValid(_key.getInterface(), '.')) { std::string interfaceName(_key.getInterface()); + itsInterfacePos = interfaceName.rfind('.'); + itsInterfacePos++; + if( itsInterfacePos != std::string::npos + && ( interfaceName.length() - itsInterfacePos >= 4) ) { + itsVersion = interfaceName.substr(itsInterfacePos); + if( itsVersion != "" ) { + std::size_t itsSeparatorPos = itsVersion.find('_'); + if (itsSeparatorPos == std::string::npos) { + isValidVersion = false; + } + if(isValidVersion) { + if( *(itsVersion.begin()) != 'v') { + isValidVersion = false; + } + if(isValidVersion) { + for (auto it = itsVersion.begin()+1; it != itsVersion.end(); ++it) { + if (!isdigit(*it) && *it != '_') { + isValidVersion = false; + break; + } + } + } + if( isValidVersion ) { + interfaceName.replace(itsInterfacePos - 1, 1, ":"); + } + } + } + } std::string instance(_key.getObjectPath().substr(1)); std::replace(instance.begin(), instance.end(), '/', '.'); diff --git a/src/CommonAPI/DBus/DBusConnection.cpp b/src/CommonAPI/DBus/DBusConnection.cpp index 6071830..23967a9 100644 --- a/src/CommonAPI/DBus/DBusConnection.cpp +++ b/src/CommonAPI/DBus/DBusConnection.cpp @@ -20,6 +20,22 @@ namespace CommonAPI { namespace DBus { +void MsgReplyQueueEntry::process(std::shared_ptr<DBusConnection> _connection) { + _connection->dispatchDBusMessageReply(message_, replyAsyncHandler_); +} + +void MsgReplyQueueEntry::clear() { + delete replyAsyncHandler_; +} + +void MsgQueueEntry::process(std::shared_ptr<DBusConnection> _connection) { + (void)_connection; +} + +void MsgQueueEntry::clear() { + +} + DBusConnectionStatusEvent::DBusConnectionStatusEvent(DBusConnection* dbusConnection): dbusConnection_(dbusConnection) { } @@ -52,6 +68,7 @@ DBusConnection::DBusConnection(DBusType_t busType, dispatchThread_(NULL), dispatchSource_(), watchContext_(NULL), + timeoutContext_(NULL), connection_(NULL), busType_(busType), dbusConnectionStatusEvent_(this), @@ -74,6 +91,7 @@ DBusConnection::DBusConnection(::DBusConnection *_connection, dispatchThread_(NULL), dispatchSource_(), watchContext_(NULL), + timeoutContext_(NULL), connection_(_connection), busType_(DBusType_t::WRAPPED), dbusConnectionStatusEvent_(this), @@ -86,7 +104,7 @@ DBusConnection::DBusConnection(::DBusConnection *_connection, activeConnections_(0), isDisconnecting_(false), isDispatching_(false), - isWaitingOnFinishedDispatching_(false){ + isWaitingOnFinishedDispatching_(false) { dbus_threads_init_default(); } @@ -106,10 +124,11 @@ DBusConnection::~DBusConnection() { dbus_connection_set_timeout_functions(connection_, NULL, NULL, NULL, NULL, NULL); } - lockedContext->deregisterWatch(msgWatch_); - lockedContext->deregisterDispatchSource(msgDispatchSource_); + lockedContext->deregisterWatch(queueWatch_); + lockedContext->deregisterDispatchSource(queueDispatchSource_); lockedContext->deregisterDispatchSource(dispatchSource_); delete watchContext_; + delete timeoutContext_; } // ensure, the registry survives until disconnecting is done... @@ -170,21 +189,26 @@ DBusConnection::~DBusConnection() { } } - bool DBusConnection::attachMainLoopContext(std::weak_ptr<MainLoopContext> mainLoopContext) { mainLoopContext_ = mainLoopContext; if (auto lockedContext = mainLoopContext_.lock()) { - msgWatch_ = new DBusMessageWatch(shared_from_this()); - msgDispatchSource_ = new DBusMessageDispatchSource(msgWatch_); + queueWatch_ = new DBusQueueWatch(shared_from_this()); + queueDispatchSource_ = new DBusQueueDispatchSource(queueWatch_); - lockedContext->registerDispatchSource(msgDispatchSource_); - lockedContext->registerWatch(msgWatch_); + lockedContext->registerDispatchSource(queueDispatchSource_); + lockedContext->registerWatch(queueWatch_); dispatchSource_ = new DBusDispatchSource(this); watchContext_ = new WatchContext(mainLoopContext_, dispatchSource_, shared_from_this()); + timeoutContext_ = new TimeoutContext(mainLoopContext_, shared_from_this()); lockedContext->registerDispatchSource(dispatchSource_); + if (!isConnected()) { + COMMONAPI_ERROR(std::string(__FUNCTION__), "not connected"); + return false; + } + dbus_connection_set_wakeup_main_function( connection_, &DBusConnection::onWakeupMainContext, @@ -208,7 +232,7 @@ bool DBusConnection::attachMainLoopContext(std::weak_ptr<MainLoopContext> mainLo &DBusConnection::onAddTimeout, &DBusConnection::onRemoveTimeout, &DBusConnection::onToggleTimeout, - &mainLoopContext_, + timeoutContext_, NULL); if (!success) { @@ -297,13 +321,13 @@ void DBusConnection::onToggleWatch(::DBusWatch* libdbusWatch, void* data) { dbus_bool_t DBusConnection::onAddTimeout(::DBusTimeout* libdbusTimeout, void* data) { - std::weak_ptr<MainLoopContext>* mainloop = static_cast<std::weak_ptr<MainLoopContext>*>(data); - if (NULL == mainloop) { - COMMONAPI_ERROR(std::string(__FUNCTION__), "mainloop == NULL"); + TimeoutContext* timeoutContext = static_cast<TimeoutContext*>(data); + if (NULL == timeoutContext) { + COMMONAPI_ERROR(std::string(__FUNCTION__), "timeoutContext == NULL"); return FALSE; } - DBusTimeout* dbusTimeout = new DBusTimeout(libdbusTimeout, *mainloop); + DBusTimeout* dbusTimeout = new DBusTimeout(libdbusTimeout, timeoutContext->mainLoopContext_, timeoutContext->dbusConnection_); dbus_timeout_set_data(libdbusTimeout, dbusTimeout, NULL); if (dbusTimeout->isReadyToBeMonitored()) { @@ -364,11 +388,11 @@ bool DBusConnection::connect(DBusError &dbusError, bool startDispatchThread) { #ifdef _MSC_VER COMMONAPI_ERROR(std::string(__FUNCTION__) + ": Name: " + dbusError.getName() + - " Message: " + dbusError.getMessage()) + " Message: " + dbusError.getMessage()); #else COMMONAPI_ERROR(std::string(__PRETTY_FUNCTION__) + ": Name: " + dbusError.getName() + - " Message: " + dbusError.getMessage()) + " Message: " + dbusError.getMessage()); #endif return false; } @@ -404,8 +428,15 @@ void DBusConnection::disconnect() { std::lock_guard<std::mutex> dbusConnectionLock(connectionGuard_); std::unique_lock<std::mutex> dispatchLock(dispatchMutex_); + + std::shared_ptr<DBusServiceRegistry> itsRegistry = DBusServiceRegistry::get(shared_from_this()); + isDisconnecting_ = true; + if (std::shared_ptr<CommonAPI::MainLoopContext> mainLoopContext = mainLoopContext_.lock()) { + Factory::get()->releaseConnection(connectionId_); + } + if (isConnected()) { dbusConnectionStatusEvent_.notifyListeners(AvailabilityStatus::NOT_AVAILABLE); @@ -457,10 +488,6 @@ void DBusConnection::disconnect() { dbus_connection_unref(connection_); connection_ = nullptr; } - - if (std::shared_ptr<CommonAPI::MainLoopContext> mainLoopContext = mainLoopContext_.lock()) { - Factory::get()->releaseConnection(connectionId_, mainLoopContext.get()); - } } bool DBusConnection::isConnected() const { @@ -504,11 +531,11 @@ bool DBusConnection::requestServiceNameAndBlock(const std::string& serviceName) #ifdef _MSC_VER // Visual Studio COMMONAPI_ERROR(std::string(__FUNCTION__) + ": Name: " + dbusError.getName() + - " Message: " + dbusError.getMessage()) + " Message: " + dbusError.getMessage()); #else COMMONAPI_ERROR(std::string(__PRETTY_FUNCTION__) + ": Name: " + dbusError.getName() + - " Message: " + dbusError.getMessage()) + " Message: " + dbusError.getMessage()); #endif } } @@ -594,10 +621,11 @@ void DBusConnection::onLibdbusPendingCall(::DBusPendingCall* _libdbusPendingCall _dbusMessageReplyAsyncHandler->lock(); // libdbus calls the cleanup method below - if(_libdbusPendingCall) + if(_libdbusPendingCall && processAsyncHandler) { dbus_pending_call_unref(_libdbusPendingCall); + _dbusMessageReplyAsyncHandler->setExecutionFinished(); + } - _dbusMessageReplyAsyncHandler->setExecutionFinished(); if (_dbusMessageReplyAsyncHandler->hasToBeDeleted()) { _dbusMessageReplyAsyncHandler->unlock(); delete _dbusMessageReplyAsyncHandler; @@ -619,7 +647,6 @@ void DBusConnection::onLibdbusPendingCallNotifyThunk(::DBusPendingCall* _libdbus auto pendingCallNotificationData = reinterpret_cast<PendingCallNotificationData*>(_userData); auto dbusMessageReplyAsyncHandler = pendingCallNotificationData->replyAsyncHandler_; auto dbusConnection = pendingCallNotificationData->dbusConnection_; - delete pendingCallNotificationData; DBusMessage dbusMessage = DBusConnection::convertToDBusMessage(_libdbusPendingCall); @@ -627,9 +654,8 @@ void DBusConnection::onLibdbusPendingCallNotifyThunk(::DBusPendingCall* _libdbus } void DBusConnection::onLibdbusDataCleanup(void *_data) { - // Dummy method -> deleting of userData is not executed in this method. Deleting is - // executed by handling of the timeouts. - (void)_data; + auto pendingCallNotificationData = reinterpret_cast<PendingCallNotificationData*>(_data); + delete pendingCallNotificationData; } //Would not be needed if libdbus would actually handle its timeouts for pending calls. @@ -648,7 +674,7 @@ void DBusConnection::enforceAsynchronousTimeouts() const { auto minTimeout = std::get<0>(minTimeoutElement->second); - std::chrono::high_resolution_clock::time_point now = std::chrono::high_resolution_clock::now(); + std::chrono::steady_clock::time_point now = (std::chrono::steady_clock::time_point) std::chrono::steady_clock::now(); timeout = (int)std::chrono::duration_cast<std::chrono::milliseconds>(minTimeout - now).count(); } @@ -662,7 +688,7 @@ void DBusConnection::enforceAsynchronousTimeouts() const { enforceTimeoutMutex_.lock(); auto it = timeoutMap_.begin(); while (it != timeoutMap_.end()) { - std::chrono::high_resolution_clock::time_point now = std::chrono::high_resolution_clock::now(); + std::chrono::steady_clock::time_point now = (std::chrono::steady_clock::time_point) std::chrono::steady_clock::now(); DBusMessageReplyAsyncHandler* asyncHandler = std::get<1>(it->second); DBusPendingCall* libdbusPendingCall = it->first; @@ -683,7 +709,7 @@ void DBusConnection::enforceAsynchronousTimeouts() const { if (executionStarted && !executionFinished) { // execution of asyncHandler is still running // ==> add 100 ms for next timeout check - std::get<0>(it->second) = std::chrono::high_resolution_clock::now() + std::chrono::milliseconds(100); + std::get<0>(it->second) = (std::chrono::steady_clock::time_point) std::chrono::steady_clock::now() + std::chrono::milliseconds(100); } else { if (!executionFinished) { // execution of asyncHandler was not finished (and not started) @@ -729,6 +755,7 @@ void DBusConnection::enforceAsynchronousTimeouts() const { } enforceTimeoutMutex_.unlock(); } else { + std::lock_guard<std::mutex> itsLock(enforceTimeoutMutex_); auto it = timeoutMap_.begin(); @@ -768,18 +795,20 @@ void DBusConnection::enforceAsynchronousTimeouts() const { } } -std::future<CallStatus> DBusConnection::sendDBusMessageWithReplyAsync( +bool DBusConnection::sendDBusMessageWithReplyAsync( const DBusMessage& dbusMessage, std::unique_ptr<DBusMessageReplyAsyncHandler> dbusMessageReplyAsyncHandler, const CommonAPI::CallInfo *_info) const { + std::lock_guard<std::mutex> dbusConnectionLock(connectionGuard_); + if (!dbusMessage) { COMMONAPI_ERROR(std::string(__FUNCTION__), "message == NULL"); - return std::future<CallStatus>(); + return false; } if (!isConnected()) { COMMONAPI_ERROR(std::string(__FUNCTION__), "not connected"); - return std::future<CallStatus>(); + return false; } DBusPendingCall* libdbusPendingCall; @@ -787,14 +816,8 @@ std::future<CallStatus> DBusConnection::sendDBusMessageWithReplyAsync( DBusMessageReplyAsyncHandler* replyAsyncHandler = dbusMessageReplyAsyncHandler.release(); - std::future<CallStatus> callStatusFuture; - try { - callStatusFuture = replyAsyncHandler->getFuture(); - } catch (std::exception& e) { - (void)e; - } - PendingCallNotificationData* userData = new PendingCallNotificationData(this, replyAsyncHandler); + DBusTimeout::currentTimeout_ = NULL; libdbusSuccess = dbus_connection_send_with_reply_set_notify(connection_, dbusMessage.message_, @@ -811,10 +834,10 @@ std::future<CallStatus> DBusConnection::sendDBusMessageWithReplyAsync( if (!libdbusSuccess || !libdbusPendingCall) { #ifdef _MSC_VER // Visual Studio COMMONAPI_ERROR(std::string(__FUNCTION__) + - ": (!libdbusSuccess || !libdbusPendingCall) == true") + ": (!libdbusSuccess || !libdbusPendingCall) == true"); #else COMMONAPI_ERROR(std::string(__PRETTY_FUNCTION__) + - ": (!libdbusSuccess || !libdbusPendingCall) == true") + ": (!libdbusSuccess || !libdbusPendingCall) == true"); #endif if (libdbusPendingCall) { dbus_pending_call_unref(libdbusPendingCall); @@ -827,13 +850,13 @@ std::future<CallStatus> DBusConnection::sendDBusMessageWithReplyAsync( nullptr)); } mainLoopContext_.lock()->wakeup(); - return callStatusFuture; + return true; } if (_info->timeout_ != DBUS_TIMEOUT_INFINITE) { - auto timeoutPoint = std::chrono::high_resolution_clock::now() + std::chrono::milliseconds(_info->timeout_); + auto timeoutPoint = (std::chrono::steady_clock::time_point) std::chrono::steady_clock::now() + std::chrono::milliseconds(_info->timeout_); std::tuple< - std::chrono::time_point<std::chrono::high_resolution_clock>, + std::chrono::steady_clock::time_point, DBusMessageReplyAsyncHandler*, DBusMessage> toInsert { timeoutPoint, @@ -841,11 +864,20 @@ std::future<CallStatus> DBusConnection::sendDBusMessageWithReplyAsync( dbusMessage }; + if(DBusTimeout::currentTimeout_) + DBusTimeout::currentTimeout_->setPendingCall(libdbusPendingCall); + enforceTimeoutMutex_.lock(); auto ret = timeoutMap_.insert( { libdbusPendingCall, toInsert } ); if (ret.second == false) { // key has been reused // update the map value with the new info + DBusMessageReplyAsyncHandler* asyncHandler; + auto it = timeoutMap_.find(ret.first->first); + if(it != timeoutMap_.end()) { + asyncHandler = std::get<1>(it->second); + delete asyncHandler; + } timeoutMap_.erase(ret.first); timeoutMap_.insert( { libdbusPendingCall, toInsert } ); } @@ -860,7 +892,7 @@ std::future<CallStatus> DBusConnection::sendDBusMessageWithReplyAsync( timeoutInfiniteAsyncHandlers_.insert(replyAsyncHandler); } - return callStatusFuture; + return true; } DBusMessage DBusConnection::sendDBusMessageWithReplyAndBlock(const DBusMessage& dbusMessage, @@ -902,17 +934,24 @@ void DBusConnection::dispatchDBusMessageReply(const DBusMessage& _reply, } bool DBusConnection::singleDispatch() { + std::list<MainloopTimeout_t> mainloopTimeouts; { - std::lock_guard<std::mutex> itsLock(mainloopTimeoutsMutex_); + mainloopTimeoutsMutex_.lock(); for (auto t : mainloopTimeouts_) { - std::get<0>(t)->onDBusMessageReply(std::get<2>(t), std::get<1>(t)); - if (std::get<3>(t) != nullptr) { - dbus_pending_call_unref(std::get<3>(t)); - } - delete std::get<0>(t); + mainloopTimeouts.push_back(t); } mainloopTimeouts_.clear(); + mainloopTimeoutsMutex_.unlock(); } + + for (auto t : mainloopTimeouts) { + std::get<0>(t)->onDBusMessageReply(std::get<2>(t), std::get<1>(t)); + if (std::get<3>(t) != nullptr) { + dbus_pending_call_unref(std::get<3>(t)); + } + delete std::get<0>(t); + } + if(setDispatching(true)) { bool dispatchStatus(connection_ && dbus_connection_dispatch(connection_) == DBUS_DISPATCH_DATA_REMAINS); setDispatching(false); @@ -948,10 +987,13 @@ void DBusConnection::incrementConnection() { } void DBusConnection::decrementConnection() { - std::lock_guard < std::mutex > lock(activeConnectionsMutex_); - activeConnections_--; + int activeConnections = 0; + { + std::lock_guard < std::mutex > lock(activeConnectionsMutex_); + activeConnections = --activeConnections_; + } - if (activeConnections_ <= 0) { + if (activeConnections <= 0) { disconnect(); } } @@ -978,19 +1020,25 @@ bool DBusConnection::setDispatching(bool _isDispatching) { } } -void DBusConnection::sendPendingSelectiveSubscription(DBusProxy* callingProxy, std::string methodName, - DBusSignalHandler* dbusSignalHandler, uint32_t tag) { +void DBusConnection::sendPendingSelectiveSubscription(DBusProxy* callingProxy, std::string interfaceMemberName, + DBusSignalHandler* dbusSignalHandler, uint32_t tag, std::string interfaceMemberSignature) { bool outarg; + std::string methodName = "subscribeFor" + interfaceMemberName + "Selective"; DBusProxyHelper<CommonAPI::DBus::DBusSerializableArguments<>, CommonAPI::DBus::DBusSerializableArguments<bool>>::callMethodAsync( *callingProxy, methodName.c_str(), "", &CommonAPI::DBus::defaultCallInfo, - [this, dbusSignalHandler, callingProxy, tag] + [this, dbusSignalHandler, callingProxy, tag, interfaceMemberName, interfaceMemberSignature] (const CommonAPI::CallStatus& callStatus, const bool& accepted) { if (callStatus == CommonAPI::CallStatus::SUCCESS && accepted) { dbusSignalHandler->onSpecificError(CommonAPI::CallStatus::SUCCESS, tag); } else { + const DBusSignalHandlerPath itsToken(callingProxy->getDBusAddress().getObjectPath(), + callingProxy->getDBusAddress().getInterface(), + interfaceMemberName, + interfaceMemberSignature); + removeSignalMemberHandler(itsToken, dbusSignalHandler); dbusSignalHandler->onSpecificError(CommonAPI::CallStatus::SUBSCRIPTION_REFUSED, tag); } }, std::make_tuple(outarg)); @@ -1007,30 +1055,33 @@ void DBusConnection::subscribeForSelectiveBroadcast( std::string methodName = "subscribeFor" + interfaceMemberName + "Selective"; + DBusProxyConnection::DBusSignalHandlerToken token = addSignalMemberHandler( + objectPath, + interfaceName, + interfaceMemberName, + interfaceMemberSignature, + dbusSignalHandler, + true + ); + dbusSignalHandler->setSubscriptionToken(token, tag); + bool outarg; - DBusProxyHelper<CommonAPI::DBus::DBusSerializableArguments<>, - CommonAPI::DBus::DBusSerializableArguments<bool>>::callMethodAsync( - *callingProxy, methodName.c_str(), "", - &CommonAPI::DBus::defaultCallInfo, - [this, objectPath, interfaceName, interfaceMemberName, interfaceMemberSignature, dbusSignalHandler, callingProxy, tag] - (const CommonAPI::CallStatus& callStatus, const bool& accepted) { - if ((callStatus == CommonAPI::CallStatus::SUCCESS && accepted) || !callingProxy->isAvailable()) { - DBusProxyConnection::DBusSignalHandlerToken token = addSignalMemberHandler( - objectPath, - interfaceName, - interfaceMemberName, - interfaceMemberSignature, - dbusSignalHandler, - true - ); - dbusSignalHandler->setSubscriptionToken(token, tag); - } - if (accepted) { - dbusSignalHandler->onSpecificError(CommonAPI::CallStatus::SUCCESS, tag); - } else { - dbusSignalHandler->onSpecificError(CommonAPI::CallStatus::SUBSCRIPTION_REFUSED, tag); - } - }, std::make_tuple(outarg)); + if(callingProxy->isAvailable()) { + DBusProxyHelper<CommonAPI::DBus::DBusSerializableArguments<>, + CommonAPI::DBus::DBusSerializableArguments<bool>>::callMethodAsync( + *callingProxy, methodName.c_str(), "", + &CommonAPI::DBus::defaultCallInfo, + [this, objectPath, interfaceName, interfaceMemberName, interfaceMemberSignature, dbusSignalHandler, callingProxy, tag, token] + (const CommonAPI::CallStatus& callStatus, const bool& accepted) { + (void)callStatus; + if (accepted) { + dbusSignalHandler->onSpecificError(CommonAPI::CallStatus::SUCCESS, tag); + } else { + removeSignalMemberHandler(token, dbusSignalHandler); + dbusSignalHandler->onSpecificError(CommonAPI::CallStatus::SUBSCRIPTION_REFUSED, tag); + } + }, std::make_tuple(outarg)); + } } void DBusConnection::unsubscribeFromSelectiveBroadcast(const std::string& eventName, @@ -1585,6 +1636,9 @@ void notifyDBusOMSignalHandlers(DBusSignalHandlersTable& dbusSignalHandlerstable signalEntry->second.first->lock(); signalGuard_.unlock(); + // ensure, the registry survives + std::shared_ptr<DBusServiceRegistry> itsRegistry_ = DBusServiceRegistry::get(shared_from_this()); + notifyDBusSignalHandlers(dbusSignalHandlerTable_, signalEntry, dbusMessage, dbusHandlerResult); @@ -1668,14 +1722,27 @@ std::shared_ptr<DBusConnection> DBusConnection::wrap(::DBusConnection *_connecti return std::make_shared<DBusConnection>(_connection, _connectionId); } -void DBusConnection::pushDBusMessageReply(const DBusMessage& _reply, +void DBusConnection::pushDBusMessageReplyToMainLoop(const DBusMessage& _reply, std::unique_ptr<DBusMessageReplyAsyncHandler> _dbusMessageReplyAsyncHandler) { // push message to the message queue DBusMessageReplyAsyncHandler* replyAsyncHandler = _dbusMessageReplyAsyncHandler.release(); replyAsyncHandler->setHasToBeDeleted(); - std::shared_ptr<DBusMessageWatch::MsgReplyQueueEntry> msgReplyQueueEntry = std::make_shared<DBusMessageWatch::MsgReplyQueueEntry>( + std::shared_ptr<MsgReplyQueueEntry> msgReplyQueueEntry = std::make_shared<MsgReplyQueueEntry>( replyAsyncHandler, _reply); - msgWatch_->pushMsgQueue(msgReplyQueueEntry); + queueWatch_->pushQueue(msgReplyQueueEntry); +} + +void DBusConnection::setPendingCallTimedOut(DBusPendingCall* _pendingCall, ::DBusTimeout* _timeout) const { + std::lock_guard<std::mutex> lock(enforceTimeoutMutex_); + auto it = timeoutMap_.find(_pendingCall); + if(it != timeoutMap_.end()) { + auto replyAsyncHandler = std::get<1>(it->second); + replyAsyncHandler->lock(); + if(!replyAsyncHandler->getTimeoutOccurred()) { + dbus_timeout_handle(_timeout); + } + replyAsyncHandler->unlock(); + } } } // namespace DBus diff --git a/src/CommonAPI/DBus/DBusDaemonProxy.cpp b/src/CommonAPI/DBus/DBusDaemonProxy.cpp index 58ee08f..3b5b29f 100644 --- a/src/CommonAPI/DBus/DBusDaemonProxy.cpp +++ b/src/CommonAPI/DBus/DBusDaemonProxy.cpp @@ -62,7 +62,15 @@ bool DBusDaemonProxy::isAvailableBlocking() const { std::future<AvailabilityStatus> DBusDaemonProxy::isAvailableAsync( isAvailableAsyncCallback _callback, const CallInfo *_info) const { - return isAvailableAsync(_callback, _info); + (void)_callback; + (void)_info; + std::promise<AvailabilityStatus> promise; + if(isAvailable()) { + promise.set_value(CommonAPI::AvailabilityStatus::AVAILABLE); + } else { + promise.set_value(CommonAPI::AvailabilityStatus::NOT_AVAILABLE); + } + return promise.get_future(); } ProxyStatusEvent& DBusDaemonProxy::getProxyStatusEvent() { diff --git a/src/CommonAPI/DBus/DBusFactory.cpp b/src/CommonAPI/DBus/DBusFactory.cpp index d7fa84c..1c05455 100644 --- a/src/CommonAPI/DBus/DBusFactory.cpp +++ b/src/CommonAPI/DBus/DBusFactory.cpp @@ -47,7 +47,9 @@ Factory::init() { void Factory::registerInterface(InterfaceInitFunction _function) { +#ifndef WIN32 std::lock_guard<std::mutex> itsLock(initializerMutex_); +#endif if (isInitialized_) { // We are already running --> initialize the interface library! _function(); @@ -79,11 +81,15 @@ Factory::createProxy( DBusAddress dbusAddress; if (DBusAddressTranslator::get()->translate(address, dbusAddress)) { - std::shared_ptr<DBusProxy> proxy - = proxyCreateFunctionsIterator->second(dbusAddress, getConnection(_connectionId)); - if (proxy) - proxy->init(); - return proxy; + std::shared_ptr<DBusConnection> connection + = getConnection(_connectionId); + if (connection) { + std::shared_ptr<DBusProxy> proxy + = proxyCreateFunctionsIterator->second(dbusAddress, connection); + if (proxy) + proxy->init(); + return proxy; + } } } return nullptr; @@ -100,11 +106,15 @@ Factory::createProxy( DBusAddress dbusAddress; if (DBusAddressTranslator::get()->translate(address, dbusAddress)) { - std::shared_ptr<DBusProxy> proxy - = proxyCreateFunctionsIterator->second(dbusAddress, getConnection(_context)); - if (proxy) - proxy->init(); - return proxy; + std::shared_ptr<DBusConnection> connection + = getConnection(_context); + if (connection) { + std::shared_ptr<DBusProxy> proxy + = proxyCreateFunctionsIterator->second(dbusAddress, connection); + if (proxy) + proxy->init(); + return proxy; + } } } @@ -120,11 +130,15 @@ Factory::registerStub( CommonAPI::Address address(_domain, _interface, _instance); DBusAddress dbusAddress; if (DBusAddressTranslator::get()->translate(address, dbusAddress)) { - std::shared_ptr<DBusStubAdapter> adapter - = stubAdapterCreateFunctionsIterator->second(dbusAddress, getConnection(_connectionId), _stub); - if (adapter) { - adapter->init(adapter); - return registerStubAdapter(adapter); + std::shared_ptr<DBusConnection> connection = getConnection(_connectionId); + if (connection) { + std::shared_ptr<DBusStubAdapter> adapter + = stubAdapterCreateFunctionsIterator->second(dbusAddress, connection, _stub); + if (adapter) { + adapter->init(adapter); + if (registerStubAdapter(adapter)) + return true; + } } } } @@ -141,11 +155,15 @@ Factory::registerStub( CommonAPI::Address address(_domain, _interface, _instance); DBusAddress dbusAddress; if (DBusAddressTranslator::get()->translate(address, dbusAddress)) { - std::shared_ptr<DBusStubAdapter> adapter - = stubAdapterCreateFunctionsIterator->second(dbusAddress, getConnection(_context), _stub); - if (adapter) { - adapter->init(adapter); - return registerStubAdapter(adapter); + std::shared_ptr<DBusConnection> connection = getConnection(_context); + if (connection) { + std::shared_ptr<DBusStubAdapter> adapter + = stubAdapterCreateFunctionsIterator->second(dbusAddress, connection, _stub); + if (adapter) { + adapter->init(adapter); + if (registerStubAdapter(adapter)) + return true; + } } } } @@ -175,8 +193,10 @@ Factory::unregisterStub(const std::string &_domain, const std::string &_interfac services_.erase(adapterResult->first); + decrementConnection(connection); + return true; - } + } return false; } @@ -245,20 +265,29 @@ Factory::unregisterStubAdapter(std::shared_ptr<DBusStubAdapter> _adapter) { /////////////////////////////////////////////////////////////////////////////// std::shared_ptr<DBusConnection> Factory::getConnection(const ConnectionId_t &_connectionId) { - - std::lock_guard<std::mutex> itsGuard(connectionsMutex_); + std::lock_guard<std::recursive_mutex> itsConnectionGuard(connectionsMutex_); + std::shared_ptr<DBusConnection> itsConnection; auto itsConnectionIterator = connections_.find(_connectionId); - if (itsConnectionIterator != connections_.end()) { - return itsConnectionIterator->second; + if (itsConnectionIterator != connections_.end()) + itsConnection = itsConnectionIterator->second; + + if(!itsConnection) { + // No connection found, lets create and initialize one + const DBusType_t dbusType = DBusAddressTranslator::get()->getDBusBusType(_connectionId); + itsConnection = std::make_shared<DBusConnection>(dbusType, _connectionId); + + if (itsConnection) { + if (!itsConnection->connect(true)) { + COMMONAPI_ERROR("Failed to create connection ", _connectionId); + itsConnection.reset(); + } else { + connections_.insert({ _connectionId, itsConnection } ); + } + } } - // No connection found, lets create and initialize one - const DBusType_t dbusType = DBusAddressTranslator::get()->getDBusBusType(_connectionId); - std::shared_ptr<DBusConnection> itsConnection - = std::make_shared<DBusConnection>(dbusType, _connectionId); - connections_.insert({ _connectionId, itsConnection }); + incrementConnection(itsConnection); - itsConnection->connect(true); return itsConnection; } @@ -267,21 +296,29 @@ Factory::getConnection(std::shared_ptr<MainLoopContext> _context) { if (!_context) return getConnection(DEFAULT_CONNECTION_ID); - std::lock_guard<std::mutex> itsGuard(contextConnectionsMutex_); - auto itsConnectionIterator = contextConnections_.find(_context.get()); - if (itsConnectionIterator != contextConnections_.end()) { - return itsConnectionIterator->second; + std::lock_guard<std::recursive_mutex> itsConnectionGuard(connectionsMutex_); + std::shared_ptr<DBusConnection> itsConnection; + auto itsConnectionIterator = connections_.find(_context->getName()); + if (itsConnectionIterator != connections_.end()) + itsConnection = itsConnectionIterator->second; + + if(!itsConnection) { + // No connection found, lets create and initialize one + const DBusType_t dbusType = DBusAddressTranslator::get()->getDBusBusType(_context->getName()); + itsConnection = std::make_shared<DBusConnection>(dbusType, _context->getName()); + + if (itsConnection) { + if (!itsConnection->connect(false)) { + itsConnection.reset(); + } else { + connections_.insert({ _context->getName(), itsConnection } ); + itsConnection->attachMainLoopContext(_context); + } + } } - // No connection found, lets create and initialize one - const DBusType_t dbusType = DBusAddressTranslator::get()->getDBusBusType(_context->getName()); - std::shared_ptr<DBusConnection> itsConnection - = std::make_shared<DBusConnection>(dbusType, _context->getName()); - contextConnections_.insert({ _context.get(), itsConnection } ); - - itsConnection->connect(false); - if (_context) - itsConnection->attachMainLoopContext(_context); + if (itsConnection) + incrementConnection(itsConnection); return itsConnection; } @@ -343,9 +380,11 @@ Factory::registerManagedService(const std::shared_ptr<DBusStubAdapter> &_stubAda services_.erase(insertResult.first); } + if(isAcquired) + incrementConnection(_stubAdapter->getDBusConnection()); + return isAcquired; } - return false; } @@ -370,90 +409,47 @@ Factory::unregisterManagedService(const ServicesMap::iterator &iterator) { if (isUnregistered) { connection->releaseServiceName(serviceName); services_.erase(iterator); + decrementConnection(connection); } // TODO: log error return isUnregistered; } void Factory::incrementConnection(std::shared_ptr<DBusProxyConnection> _connection) { + std::lock_guard<std::recursive_mutex> itsConnectionGuard(connectionsMutex_); std::shared_ptr<DBusConnection> connection; - { - std::lock_guard<std::mutex> itsConnectionGuard(connectionsMutex_); - for (auto itsConnectionIterator = connections_.begin(); itsConnectionIterator != connections_.end(); itsConnectionIterator++) { - if (itsConnectionIterator->second == _connection) { - connection = itsConnectionIterator->second; - break; - } + for (auto itsConnectionIterator = connections_.begin(); itsConnectionIterator != connections_.end(); itsConnectionIterator++) { + if (itsConnectionIterator->second == _connection) { + connection = itsConnectionIterator->second; + break; } } if(connection) connection->incrementConnection(); - - std::shared_ptr<DBusConnection> contextConnection; - { - std::lock_guard<std::mutex> itsContextConnectionGuard(contextConnectionsMutex_); - for (auto itsConnectionIterator = contextConnections_.begin(); itsConnectionIterator != contextConnections_.end(); itsConnectionIterator++) { - if (itsConnectionIterator->second == _connection) { - contextConnection = itsConnectionIterator->second; - break; - } - } - } - - if(contextConnection) - contextConnection->incrementConnection(); } void Factory::decrementConnection(std::shared_ptr<DBusProxyConnection> _connection) { + std::lock_guard<std::recursive_mutex> itsConnectionGuard(connectionsMutex_); std::shared_ptr<DBusConnection> connection; - { - std::lock_guard<std::mutex> itsConnectionGuard(connectionsMutex_); - for (auto itsConnectionIterator = connections_.begin(); itsConnectionIterator != connections_.end(); itsConnectionIterator++) { - if (itsConnectionIterator->second == _connection) { - connection = itsConnectionIterator->second; - break; - } + for (auto itsConnectionIterator = connections_.begin(); itsConnectionIterator != connections_.end(); itsConnectionIterator++) { + if (itsConnectionIterator->second == _connection) { + connection = itsConnectionIterator->second; + break; } } if(connection) connection->decrementConnection(); - - std::shared_ptr<DBusConnection> contextConnection; - { - std::lock_guard<std::mutex> itsContextConnectionGuard(contextConnectionsMutex_); - for (auto itsConnectionIterator = contextConnections_.begin(); itsConnectionIterator != contextConnections_.end(); itsConnectionIterator++) { - if (itsConnectionIterator->second == _connection) { - contextConnection = itsConnectionIterator->second; - break; - } - } - } - - if(contextConnection) - contextConnection->decrementConnection(); } -void Factory::releaseConnection(const ConnectionId_t& _connectionId, MainLoopContext* _mainloopContext) { - { - std::lock_guard<std::mutex> itsConnectionGuard(connectionsMutex_); - auto connection = connections_.find(_connectionId); +void Factory::releaseConnection(const ConnectionId_t& _connectionId) { + std::lock_guard<std::recursive_mutex> itsConnectionGuard(connectionsMutex_); + auto itsConnection = connections_.find(_connectionId); - if (connection != connections_.end()) { - DBusServiceRegistry::remove(connection->second); - connections_.erase(_connectionId); - } - } - - { - std::lock_guard<std::mutex> itsContextConnectionGuard(contextConnectionsMutex_); - auto connectionContext = contextConnections_.find(_mainloopContext); - - if (connectionContext != contextConnections_.end()) { - DBusServiceRegistry::remove(connectionContext->second); - contextConnections_.erase(_mainloopContext); - } + if (itsConnection != connections_.end()) { + DBusServiceRegistry::remove(itsConnection->second); + connections_.erase(_connectionId); } } diff --git a/src/CommonAPI/DBus/DBusInstanceAvailabilityStatusChangedEvent.cpp b/src/CommonAPI/DBus/DBusInstanceAvailabilityStatusChangedEvent.cpp new file mode 100644 index 0000000..4f93feb --- /dev/null +++ b/src/CommonAPI/DBus/DBusInstanceAvailabilityStatusChangedEvent.cpp @@ -0,0 +1,269 @@ +// Copyright (C) 2013-2015 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +#include <CommonAPI/DBus/DBusInstanceAvailabilityStatusChangedEvent.hpp> + +#include <CommonAPI/DBus/DBusAddressTranslator.hpp> + +namespace CommonAPI { +namespace DBus { + +DBusInstanceAvailabilityStatusChangedEvent::DBusInstanceAvailabilityStatusChangedEvent( + DBusProxy &_proxy, + const std::string &_dbusInterfaceName, + const std::string &_capiInterfaceName) : + proxy_(_proxy), + observedDbusInterfaceName_(_dbusInterfaceName), + observedCapiInterfaceName_(_capiInterfaceName), + registry_(DBusServiceRegistry::get(_proxy.getDBusConnection())) { +} + +DBusInstanceAvailabilityStatusChangedEvent::~DBusInstanceAvailabilityStatusChangedEvent() { + proxy_.removeSignalMemberHandler(interfacesAddedSubscription_, this); + proxy_.removeSignalMemberHandler(interfacesRemovedSubscription_, this); +} + +void DBusInstanceAvailabilityStatusChangedEvent::onSignalDBusMessage(const DBusMessage& dbusMessage) { + if (dbusMessage.hasMemberName("InterfacesAdded")) { + onInterfacesAddedSignal(dbusMessage); + } else if (dbusMessage.hasMemberName("InterfacesRemoved")) { + onInterfacesRemovedSignal(dbusMessage); + } +} + +void DBusInstanceAvailabilityStatusChangedEvent::getAvailableServiceInstances( + CommonAPI::CallStatus &_status, + std::vector<DBusAddress> &_availableServiceInstances) { + + _availableServiceInstances.clear(); + DBusObjectManagerStub::DBusObjectPathAndInterfacesDict itsAvailableServiceInstances; + registry_->getAvailableServiceInstances(proxy_.getDBusAddress().getService(), + proxy_.getDBusAddress().getObjectPath(), + itsAvailableServiceInstances); + + _status = CommonAPI::CallStatus::SUCCESS; + translate(itsAvailableServiceInstances, _availableServiceInstances); +} + +std::future<CallStatus> DBusInstanceAvailabilityStatusChangedEvent::getAvailableServiceInstancesAsync( + GetAvailableServiceInstancesCallback _callback) { + + std::shared_ptr<std::promise<CallStatus> > promise = std::make_shared<std::promise<CallStatus>>(); + registry_->getAvailableServiceInstancesAsync(std::bind( + &DBusInstanceAvailabilityStatusChangedEvent::serviceInstancesAsyncCallback, + this, + proxy_.shared_from_this(), + std::placeholders::_1, + _callback, + promise), + proxy_.getDBusAddress().getService(), + proxy_.getDBusAddress().getObjectPath()); + return promise->get_future(); +} + +void DBusInstanceAvailabilityStatusChangedEvent::getServiceInstanceAvailabilityStatus( + const std::string &_instance, + CallStatus &_callStatus, + AvailabilityStatus &_availabilityStatus) { + + CommonAPI::Address itsAddress("local", observedCapiInterfaceName_, _instance); + DBusAddress itsDBusAddress; + DBusAddressTranslator::get()->translate(itsAddress, itsDBusAddress); + + _availabilityStatus = AvailabilityStatus::NOT_AVAILABLE; + if (registry_->isServiceInstanceAlive( + itsDBusAddress.getInterface(), + itsDBusAddress.getService(), + itsDBusAddress.getObjectPath())) { + _availabilityStatus = AvailabilityStatus::AVAILABLE; + } + _callStatus = CallStatus::SUCCESS; +} + +std::future<CallStatus> DBusInstanceAvailabilityStatusChangedEvent::getServiceInstanceAvailabilityStatusAsync( + const std::string& _instance, + ProxyManager::GetInstanceAvailabilityStatusCallback _callback) { + + std::shared_ptr<std::promise<CallStatus> > promise = std::make_shared<std::promise<CallStatus>>(); + auto proxy = proxy_.shared_from_this(); + std::async(std::launch::async, [this, _instance, _callback, promise, proxy]() { + CallStatus callStatus; + AvailabilityStatus availabilityStatus; + getServiceInstanceAvailabilityStatus(_instance, callStatus, availabilityStatus); + _callback(callStatus, availabilityStatus); + promise->set_value(callStatus); + }); + + return promise->get_future(); +} + +void DBusInstanceAvailabilityStatusChangedEvent::onFirstListenerAdded(const Listener&) { + + interfacesAddedSubscription_ = proxy_.addSignalMemberHandler( + proxy_.getDBusAddress().getObjectPath(), + DBusObjectManagerStub::getInterfaceName(), + "InterfacesAdded", + "oa{sa{sv}}", + this, + false); + + interfacesRemovedSubscription_ = proxy_.addSignalMemberHandler( + proxy_.getDBusAddress().getObjectPath(), + DBusObjectManagerStub::getInterfaceName(), + "InterfacesRemoved", + "oas", + this, + false); + + getAvailableServiceInstancesAsync([&](const CallStatus &_status, + const std::vector<DBusAddress> &_availableServices) { + if(_status == CallStatus::SUCCESS) { + for(auto service : _availableServices) { + if(service.getInterface() != observedDbusInterfaceName_) + continue; + if(addInterface(service.getObjectPath(), observedDbusInterfaceName_)) + notifyInterfaceStatusChanged(service.getObjectPath(), observedDbusInterfaceName_, AvailabilityStatus::AVAILABLE); + } + } + }); +} + +void DBusInstanceAvailabilityStatusChangedEvent::onLastListenerRemoved(const Listener&) { + proxy_.removeSignalMemberHandler(interfacesAddedSubscription_, this); + proxy_.removeSignalMemberHandler(interfacesRemovedSubscription_, this); +} + +void DBusInstanceAvailabilityStatusChangedEvent::onInterfacesAddedSignal(const DBusMessage &_message) { + DBusInputStream dbusInputStream(_message); + std::string dbusObjectPath; + std::string dbusInterfaceName; + DBusInterfacesAndPropertiesDict dbusInterfacesAndPropertiesDict; + + dbusInputStream >> dbusObjectPath; + if (dbusInputStream.hasError()) { + COMMONAPI_ERROR(std::string(__FUNCTION__) + " failed to read object path"); + } + + dbusInputStream.beginReadMapOfSerializableStructs(); + while (!dbusInputStream.readMapCompleted()) { + dbusInputStream.align(8); + dbusInputStream >> dbusInterfaceName; + dbusInputStream.skipMap(); + if (dbusInputStream.hasError()) { + COMMONAPI_ERROR(std::string(__FUNCTION__) + " failed to read interface name"); + } + if(dbusInterfaceName == observedDbusInterfaceName_ && addInterface(dbusObjectPath, dbusInterfaceName)) { + notifyInterfaceStatusChanged(dbusObjectPath, dbusInterfaceName, AvailabilityStatus::AVAILABLE); + } + } + dbusInputStream.endReadMapOfSerializableStructs(); +} + +void DBusInstanceAvailabilityStatusChangedEvent::onInterfacesRemovedSignal(const DBusMessage &_message) { + DBusInputStream dbusInputStream(_message); + std::string dbusObjectPath; + std::vector<std::string> dbusInterfaceNames; + + dbusInputStream >> dbusObjectPath; + if (dbusInputStream.hasError()) { + COMMONAPI_ERROR(std::string(__FUNCTION__) + " failed to read object path"); + } + + dbusInputStream >> dbusInterfaceNames; + if (dbusInputStream.hasError()) { + COMMONAPI_ERROR(std::string(__FUNCTION__) + " failed to read interface names"); + } + + for (const auto& dbusInterfaceName : dbusInterfaceNames) { + if(dbusInterfaceName == observedDbusInterfaceName_ && removeInterface(dbusObjectPath, dbusInterfaceName)) { + notifyInterfaceStatusChanged(dbusObjectPath, dbusInterfaceName, AvailabilityStatus::NOT_AVAILABLE); + } + } +} + +void DBusInstanceAvailabilityStatusChangedEvent::notifyInterfaceStatusChanged( + const std::string &_objectPath, + const std::string &_interfaceName, + const AvailabilityStatus &_availability) { + CommonAPI::Address itsAddress; + DBusAddress itsDBusAddress(proxy_.getDBusAddress().getService(), + _objectPath, + _interfaceName); + + DBusAddressTranslator::get()->translate(itsDBusAddress, itsAddress); + + // ensure, the proxy and the event survives until notification is done + auto itsProxy = proxy_.shared_from_this(); + + notifyListeners(itsAddress.getAddress(), _availability); +} + +bool DBusInstanceAvailabilityStatusChangedEvent::addInterface( + const std::string &_dbusObjectPath, + const std::string &_dbusInterfaceName) { + std::lock_guard<std::mutex> lock(interfacesMutex_); + auto it = interfaces_.find(_dbusObjectPath); + if (it == interfaces_.end()) { + std::set<std::string> itsInterfaces; + itsInterfaces.insert(_dbusInterfaceName); + interfaces_[_dbusObjectPath] = itsInterfaces; + return true; + } else { + if(it->second.insert(_dbusInterfaceName).second) + return true; + } + return false; +} + +bool DBusInstanceAvailabilityStatusChangedEvent::removeInterface( + const std::string &_dbusObjectPath, + const std::string &_dbusInterfaceName) { + std::lock_guard<std::mutex> lock(interfacesMutex_); + auto it = interfaces_.find(_dbusObjectPath); + if(it != interfaces_.end()) { + if(it->second.erase(_dbusInterfaceName) > 0) + return true; + } + return false; +} + +void DBusInstanceAvailabilityStatusChangedEvent::serviceInstancesAsyncCallback( + std::shared_ptr<Proxy> _proxy, + const DBusObjectManagerStub::DBusObjectPathAndInterfacesDict _dict, + GetAvailableServiceInstancesCallback &_call, + std::shared_ptr<std::promise<CallStatus> > &_promise) { + (void)_proxy; + std::vector<DBusAddress> result; + translate(_dict, result); + _call(CallStatus::SUCCESS, result); + _promise->set_value(CallStatus::SUCCESS); +} + +void DBusInstanceAvailabilityStatusChangedEvent::translate( + const DBusObjectManagerStub::DBusObjectPathAndInterfacesDict &_dict, + std::vector<DBusAddress> &_serviceInstances) { + DBusAddress itsDBusAddress; + + const std::string &itsService = proxy_.getDBusAddress().getService(); + itsDBusAddress.setService(itsService); + + for (const auto &objectPathIter : _dict) { + itsDBusAddress.setObjectPath(objectPathIter.first); + + const auto &interfacesDict = objectPathIter.second; + for (const auto &interfaceIter : interfacesDict) { + + if (interfaceIter.first == observedDbusInterfaceName_) { + itsDBusAddress.setInterface(interfaceIter.first); + _serviceInstances.push_back(itsDBusAddress); + } + } + } +} + +} // namespace DBus +} // namespace CommonAPI + + diff --git a/src/CommonAPI/DBus/DBusMainLoopContext.cpp b/src/CommonAPI/DBus/DBusMainLoopContext.cpp index 5c8afa5..428807f 100644 --- a/src/CommonAPI/DBus/DBusMainLoopContext.cpp +++ b/src/CommonAPI/DBus/DBusMainLoopContext.cpp @@ -41,36 +41,36 @@ bool DBusDispatchSource::dispatch() { return dbusConnection_->singleDispatch(); } -DBusMessageDispatchSource::DBusMessageDispatchSource(DBusMessageWatch* watch) : +DBusQueueDispatchSource::DBusQueueDispatchSource(DBusQueueWatch* watch) : watch_(watch) { watch_->addDependentDispatchSource(this); } -DBusMessageDispatchSource::~DBusMessageDispatchSource() { +DBusQueueDispatchSource::~DBusQueueDispatchSource() { std::unique_lock<std::mutex> itsLock(watchMutex_); watch_->removeDependentDispatchSource(this); } -bool DBusMessageDispatchSource::prepare(int64_t& timeout) { +bool DBusQueueDispatchSource::prepare(int64_t& timeout) { std::unique_lock<std::mutex> itsLock(watchMutex_); timeout = -1; - return !watch_->emptyMsgQueue(); + return !watch_->emptyQueue(); } -bool DBusMessageDispatchSource::check() { +bool DBusQueueDispatchSource::check() { std::unique_lock<std::mutex> itsLock(watchMutex_); - return !watch_->emptyMsgQueue(); + return !watch_->emptyQueue(); } -bool DBusMessageDispatchSource::dispatch() { +bool DBusQueueDispatchSource::dispatch() { std::unique_lock<std::mutex> itsLock(watchMutex_); - if (!watch_->emptyMsgQueue()) { - auto queueEntry = watch_->frontMsgQueue(); - watch_->popMsgQueue(); - watch_->processMsgQueueEntry(queueEntry); + if (!watch_->emptyQueue()) { + auto queueEntry = watch_->frontQueue(); + watch_->popQueue(); + watch_->processQueueEntry(queueEntry); } - return !watch_->emptyMsgQueue(); + return !watch_->emptyQueue(); } DBusWatch::DBusWatch(::DBusWatch* libdbusWatch, std::weak_ptr<MainLoopContext>& mainLoopContext, @@ -179,19 +179,7 @@ void DBusWatch::addDependentDispatchSource(DispatchSource* dispatchSource) { dependentDispatchSources_.push_back(dispatchSource); } -void DBusMessageWatch::MsgReplyQueueEntry::process(std::shared_ptr<DBusConnection> _connection) { - _connection->dispatchDBusMessageReply(message_, replyAsyncHandler_); -} - -void DBusMessageWatch::MsgReplyQueueEntry::clear() { - delete replyAsyncHandler_; -} - -void DBusMessageWatch::MsgQueueEntry::clear() { - -} - -DBusMessageWatch::DBusMessageWatch(std::shared_ptr<DBusConnection> _connection) : pipeValue_(4) { +DBusQueueWatch::DBusQueueWatch(std::shared_ptr<DBusConnection> _connection) : pipeValue_(4) { #ifdef WIN32 std::string pipeName = "\\\\.\\pipe\\CommonAPI-DBus-"; @@ -280,7 +268,7 @@ DBusMessageWatch::DBusMessageWatch(std::shared_ptr<DBusConnection> _connection) connection_ = _connection; } -DBusMessageWatch::~DBusMessageWatch() { +DBusQueueWatch::~DBusQueueWatch() { #ifdef WIN32 BOOL retVal = DisconnectNamedPipe((HANDLE)pipeFileDescriptors_[0]); @@ -304,36 +292,36 @@ DBusMessageWatch::~DBusMessageWatch() { close(pipeFileDescriptors_[1]); #endif - std::unique_lock<std::mutex> itsLock(msgQueueMutex_); - while(!msgQueue_.empty()) { - auto queueEntry = msgQueue_.front(); - msgQueue_.pop(); + std::unique_lock<std::mutex> itsLock(queueMutex_); + while(!queue_.empty()) { + auto queueEntry = queue_.front(); + queue_.pop(); queueEntry->clear(); } } -void DBusMessageWatch::dispatch(unsigned int) { +void DBusQueueWatch::dispatch(unsigned int) { } -const pollfd& DBusMessageWatch::getAssociatedFileDescriptor() { +const pollfd& DBusQueueWatch::getAssociatedFileDescriptor() { return pollFileDescriptor_; } #ifdef WIN32 -const HANDLE& DBusMessageWatch::getAssociatedEvent() { +const HANDLE& DBusQueueWatch::getAssociatedEvent() { return wsaEvent_; } #endif -const std::vector<DispatchSource*>& DBusMessageWatch::getDependentDispatchSources() { +const std::vector<DispatchSource*>& DBusQueueWatch::getDependentDispatchSources() { return dependentDispatchSources_; } -void DBusMessageWatch::addDependentDispatchSource(CommonAPI::DispatchSource* _dispatchSource) { +void DBusQueueWatch::addDependentDispatchSource(CommonAPI::DispatchSource* _dispatchSource) { dependentDispatchSources_.push_back(_dispatchSource); } -void DBusMessageWatch::removeDependentDispatchSource(CommonAPI::DispatchSource* _dispatchSource) { +void DBusQueueWatch::removeDependentDispatchSource(CommonAPI::DispatchSource* _dispatchSource) { std::vector<CommonAPI::DispatchSource*>::iterator it; for (it = dependentDispatchSources_.begin(); it != dependentDispatchSources_.end(); it++) { @@ -344,9 +332,9 @@ void DBusMessageWatch::removeDependentDispatchSource(CommonAPI::DispatchSource* } } -void DBusMessageWatch::pushMsgQueue(std::shared_ptr<MsgQueueEntry> _queueEntry) { - std::unique_lock<std::mutex> itsLock(msgQueueMutex_); - msgQueue_.push(_queueEntry); +void DBusQueueWatch::pushQueue(std::shared_ptr<QueueEntry> _queueEntry) { + std::unique_lock<std::mutex> itsLock(queueMutex_); + queue_.push(_queueEntry); #ifdef WIN32 char writeValue[sizeof(pipeValue_)]; @@ -371,8 +359,8 @@ void DBusMessageWatch::pushMsgQueue(std::shared_ptr<MsgQueueEntry> _queueEntry) #endif } -void DBusMessageWatch::popMsgQueue() { - std::unique_lock<std::mutex> itsLock(msgQueueMutex_); +void DBusQueueWatch::popQueue() { + std::unique_lock<std::mutex> itsLock(queueMutex_); #ifdef WIN32 char readValue[sizeof(pipeValue_)]; @@ -396,32 +384,42 @@ void DBusMessageWatch::popMsgQueue() { } #endif - msgQueue_.pop(); + queue_.pop(); } -std::shared_ptr<DBusMessageWatch::MsgQueueEntry> DBusMessageWatch::frontMsgQueue() { - std::unique_lock<std::mutex> itsLock(msgQueueMutex_); +std::shared_ptr<QueueEntry> DBusQueueWatch::frontQueue() { + std::unique_lock<std::mutex> itsLock(queueMutex_); - return msgQueue_.front(); + return queue_.front(); } -bool DBusMessageWatch::emptyMsgQueue() { - std::unique_lock<std::mutex> itsLock(msgQueueMutex_); +bool DBusQueueWatch::emptyQueue() { + std::unique_lock<std::mutex> itsLock(queueMutex_); - return msgQueue_.empty(); + return queue_.empty(); } -void DBusMessageWatch::processMsgQueueEntry(std::shared_ptr<DBusMessageWatch::MsgQueueEntry> _queueEntry) { +void DBusQueueWatch::processQueueEntry(std::shared_ptr<QueueEntry> _queueEntry) { std::shared_ptr<DBusConnection> itsConnection = connection_.lock(); if(itsConnection) { _queueEntry->process(itsConnection); } } -DBusTimeout::DBusTimeout(::DBusTimeout* libdbusTimeout, std::weak_ptr<MainLoopContext>& mainLoopContext) : +#ifdef WIN32 +__declspec(thread) DBusTimeout* DBusTimeout::currentTimeout_ = NULL; +#else +thread_local DBusTimeout* DBusTimeout::currentTimeout_ = NULL; +#endif + +DBusTimeout::DBusTimeout(::DBusTimeout* libdbusTimeout, std::weak_ptr<MainLoopContext>& mainLoopContext, + std::weak_ptr<DBusConnection>& dbusConnection) : dueTimeInMs_(TIMEOUT_INFINITE), libdbusTimeout_(libdbusTimeout), - mainLoopContext_(mainLoopContext) { + mainLoopContext_(mainLoopContext), + dbusConnection_(dbusConnection), + pendingCall_(NULL) { + currentTimeout_ = this; } bool DBusTimeout::isReadyToBeMonitored() { @@ -447,9 +445,16 @@ void DBusTimeout::stopMonitoring() { } bool DBusTimeout::dispatch() { - recalculateDueTime(); - dbus_timeout_handle(libdbusTimeout_); - return true; + std::shared_ptr<DBusConnection> itsConnection = dbusConnection_.lock(); + if(itsConnection) { + if(itsConnection->setDispatching(true)) { + recalculateDueTime(); + itsConnection->setPendingCallTimedOut(pendingCall_, libdbusTimeout_); + itsConnection->setDispatching(false); + return true; + } + } + return false; } int64_t DBusTimeout::getTimeoutInterval() const { @@ -469,5 +474,9 @@ void DBusTimeout::recalculateDueTime() { } } +void DBusTimeout::setPendingCall(DBusPendingCall* _pendingCall) { + pendingCall_ = _pendingCall; +} + } // namespace DBus } // namespace CommonAPI diff --git a/src/CommonAPI/DBus/DBusOutputStream.cpp b/src/CommonAPI/DBus/DBusOutputStream.cpp index 6fd5d0a..564aa61 100644 --- a/src/CommonAPI/DBus/DBusOutputStream.cpp +++ b/src/CommonAPI/DBus/DBusOutputStream.cpp @@ -73,7 +73,7 @@ DBusOutputStream& DBusOutputStream::writeString(const char *_value, const uint32 if (NULL == _value) { COMMONAPI_ERROR(std::string(__FUNCTION__) + " _value == NULL"); } else if (_value[_length] != '\0') { - COMMONAPI_ERROR(std::string(__FUNCTION__) + " _value is not zero-terminated") + COMMONAPI_ERROR(std::string(__FUNCTION__) + " _value is not zero-terminated"); } else { _writeValue(_length); _writeRaw(_value, _length + 1); @@ -84,7 +84,12 @@ DBusOutputStream& DBusOutputStream::writeString(const char *_value, const uint32 DBusOutputStream& DBusOutputStream::writeByteBuffer(const uint8_t *_value, const uint32_t &_length) { if (NULL == _value) { - COMMONAPI_ERROR(std::string(__FUNCTION__) + " _value == NULL"); + if (0 != _length) { + COMMONAPI_ERROR(std::string(__FUNCTION__) + " _value == NULL && _length != 0"); + } else { + COMMONAPI_WARNING(std::string(__FUNCTION__) + " _value == NULL"); + _writeValue(_length); + } } else { _writeValue(_length); _writeRaw(reinterpret_cast<const char*>(_value), _length); diff --git a/src/CommonAPI/DBus/DBusProxy.cpp b/src/CommonAPI/DBus/DBusProxy.cpp index 95c3d59..655fa6c 100644 --- a/src/CommonAPI/DBus/DBusProxy.cpp +++ b/src/CommonAPI/DBus/DBusProxy.cpp @@ -9,6 +9,7 @@ #include <CommonAPI/DBus/DBusProxy.hpp> #include <CommonAPI/DBus/DBusUtils.hpp> #include <CommonAPI/DBus/DBusProxyAsyncSignalMemberCallbackHandler.hpp> +#include <CommonAPI/DBus/DBusConnection.hpp> #include <CommonAPI/Logger.hpp> namespace CommonAPI { @@ -18,10 +19,36 @@ DBusProxyStatusEvent::DBusProxyStatusEvent(DBusProxy *_dbusProxy) : dbusProxy_(_dbusProxy) { } -void DBusProxyStatusEvent::onListenerAdded(const Listener &_listener, const Subscription _subscription) { - (void)_subscription; - if (dbusProxy_->isAvailable()) - _listener(AvailabilityStatus::AVAILABLE); +void DBusProxyStatusEvent::onListenerAdded(const Listener &_listener, + const Subscription _subscription) { + std::lock_guard<std::recursive_mutex> listenersLock(listenersMutex_); + + //notify listener about availability status -> push function to mainloop + std::weak_ptr<DBusProxy> itsdbusProxy = dbusProxy_->shared_from_this(); + std::function<void(std::weak_ptr<DBusProxy>, Listener, Subscription)> notifySpecificListenerHandler = + std::bind(&DBusProxy::notifySpecificListener, + dbusProxy_, + std::placeholders::_1, + std::placeholders::_2, + std::placeholders::_3); + dbusProxy_->getDBusConnection()->proxyPushFunctionToMainLoop<DBusConnection>( + notifySpecificListenerHandler, + itsdbusProxy, + _listener, + _subscription); +} + +void DBusProxyStatusEvent::onListenerRemoved(const Listener& _listener, + const Subscription _subscription) { + std::lock_guard<std::recursive_mutex> listenersLock(listenersMutex_); + (void)_listener; + auto listenerIt = listeners_.begin(); + while(listenerIt != listeners_.end()) { + if(listenerIt->first == _subscription) + listenerIt = listeners_.erase(listenerIt); + else + ++listenerIt; + } } void DBusProxy::availabilityTimeoutThreadHandler() const { @@ -36,7 +63,7 @@ void DBusProxy::availabilityTimeoutThreadHandler() const { isAvailableAsyncCallback, std::promise<AvailabilityStatus>, AvailabilityStatus, - std::chrono::time_point<std::chrono::high_resolution_clock> + std::chrono::steady_clock::time_point > CallbackData_t; std::list<CallbackData_t> callbacks; @@ -46,14 +73,14 @@ void DBusProxy::availabilityTimeoutThreadHandler() const { timeoutsMutex_.lock(); int timeout = std::numeric_limits<int>::max(); - std::chrono::time_point<std::chrono::high_resolution_clock> minTimeout; + std::chrono::steady_clock::time_point minTimeout; if (timeouts_.size() > 0) { auto minTimeoutElement = std::min_element(timeouts_.begin(), timeouts_.end(), [] (const AvailabilityTimeout_t& lhs, const AvailabilityTimeout_t& rhs) { return std::get<0>(lhs) < std::get<0>(rhs); }); minTimeout = std::get<0>(*minTimeoutElement); - std::chrono::high_resolution_clock::time_point now = std::chrono::high_resolution_clock::now(); + std::chrono::steady_clock::time_point now = (std::chrono::steady_clock::time_point) std::chrono::steady_clock::now(); timeout = (int)std::chrono::duration_cast<std::chrono::milliseconds>(minTimeout - now).count(); } timeoutsMutex_.unlock(); @@ -66,21 +93,22 @@ void DBusProxy::availabilityTimeoutThreadHandler() const { //iterate through timeouts auto it = timeouts_.begin(); while (it != timeouts_.end()) { - std::chrono::high_resolution_clock::time_point now = std::chrono::high_resolution_clock::now(); + std::chrono::steady_clock::time_point now = (std::chrono::steady_clock::time_point) std::chrono::steady_clock::now(); isAvailableAsyncCallback callback = std::get<1>(*it); if (now > std::get<0>(*it)) { //timeout availabilityMutex_.lock(); + std::chrono::steady_clock::time_point timepoint_; if(isAvailable()) callbacks.push_back(std::make_tuple(callback, std::move(std::get<2>(*it)), AvailabilityStatus::AVAILABLE, - std::chrono::time_point<std::chrono::high_resolution_clock>())); + timepoint_)); else callbacks.push_back(std::make_tuple(callback, std::move(std::get<2>(*it)), AvailabilityStatus::NOT_AVAILABLE, - std::chrono::time_point<std::chrono::high_resolution_clock>())); + timepoint_)); it = timeouts_.erase(it); availabilityMutex_.unlock(); } else { @@ -131,7 +159,7 @@ void DBusProxy::availabilityTimeoutThreadHandler() const { isAvailableAsyncCallback callback; AvailabilityStatus avStatus; int remainingTimeout; - std::chrono::high_resolution_clock::time_point now; + std::chrono::steady_clock::time_point now; auto it = callbacks.begin(); while(it != callbacks.end()) { @@ -139,7 +167,7 @@ void DBusProxy::availabilityTimeoutThreadHandler() const { avStatus = std::get<2>(*it); // compute remaining timeout - now = std::chrono::high_resolution_clock::now(); + now = (std::chrono::steady_clock::time_point) std::chrono::steady_clock::now(); remainingTimeout = (int)std::chrono::duration_cast<std::chrono::milliseconds>(std::get<3>(*it) - now).count(); if(remainingTimeout < 0) remainingTimeout = 0; @@ -170,10 +198,10 @@ DBusProxy::DBusProxy(const DBusAddress &_dbusAddress, interfaceVersionAttribute_(*this, "uu", "getInterfaceVersion"), dbusServiceRegistry_(DBusServiceRegistry::get(_connection)) { - Factory::get()->incrementConnection(connection_); } void DBusProxy::init() { + selfReference_ = shared_from_this(); dbusServiceRegistrySubscription_ = dbusServiceRegistry_->subscribeAvailabilityListener( getAddress().getAddress(), std::bind(&DBusProxy::onDBusServiceInstanceStatus, this, std::placeholders::_1)); @@ -216,7 +244,7 @@ std::future<AvailabilityStatus> DBusProxy::isAvailableAsync( std::future<AvailabilityStatus> future = promise.get_future(); //set timeout point - auto timeoutPoint = std::chrono::high_resolution_clock::now() + std::chrono::milliseconds(_info->timeout_); + auto timeoutPoint = (std::chrono::steady_clock::time_point) std::chrono::steady_clock::now() + std::chrono::milliseconds(_info->timeout_); timeoutsMutex_.lock(); if(timeouts_.size() == 0) { @@ -258,6 +286,10 @@ std::future<AvailabilityStatus> DBusProxy::isAvailableAsync( return future; } +AvailabilityStatus DBusProxy::getAvailabilityStatus() const { + return availabilityStatus_; +} + ProxyStatusEvent& DBusProxy::getProxyStatusEvent() { return dbusProxyStatusEvent_; } @@ -286,7 +318,24 @@ void DBusProxy::signalInitialValueCallback(const CallStatus _status, } } +void DBusProxy::notifySpecificListener(std::weak_ptr<DBusProxy> _dbusProxy, + const ProxyStatusEvent::Listener &_listener, + const ProxyStatusEvent::Subscription _subscription) { + if(_dbusProxy.lock()) { + std::lock_guard<std::recursive_mutex> listenersLock(dbusProxyStatusEvent_.listenersMutex_); + + AvailabilityStatus itsStatus = availabilityStatus_; + if (itsStatus != AvailabilityStatus::UNKNOWN) + dbusProxyStatusEvent_.notifySpecificListener(_subscription, itsStatus); + + //add listener to list so that it can be notified about a change of availability + dbusProxyStatusEvent_.listeners_.push_back(std::make_pair(_subscription, _listener)); + } +} + void DBusProxy::onDBusServiceInstanceStatus(const AvailabilityStatus& availabilityStatus) { + //ensure, proxy survives until notification is done + auto itsSelf = selfReference_.lock(); if (availabilityStatus != availabilityStatus_) { availabilityMutex_.lock(); availabilityStatus_ = availabilityStatus; @@ -297,7 +346,11 @@ void DBusProxy::onDBusServiceInstanceStatus(const AvailabilityStatus& availabili availabilityTimeoutCondition_.notify_all(); availabilityTimeoutThreadMutex_.unlock(); - dbusProxyStatusEvent_.notifyListeners(availabilityStatus); + { + std::lock_guard<std::recursive_mutex> subscribersLock(dbusProxyStatusEvent_.listenersMutex_); + for(auto listenerIt : dbusProxyStatusEvent_.listeners_) + dbusProxyStatusEvent_.notifySpecificListener(listenerIt.first, availabilityStatus_); + } if (availabilityStatus == AvailabilityStatus::AVAILABLE) { std::lock_guard < std::mutex > queueLock(signalMemberHandlerQueueMutex_); @@ -335,9 +388,11 @@ void DBusProxy::onDBusServiceInstanceStatus(const AvailabilityStatus& availabili { std::lock_guard < std::mutex > queueLock(selectiveBroadcastHandlersMutex_); for (auto selectiveBroadcasts : selectiveBroadcastHandlers) { - std::string methodName = "subscribeFor" + selectiveBroadcasts.first + "Selective"; - connection_->sendPendingSelectiveSubscription(this, methodName, selectiveBroadcasts.second.first, - selectiveBroadcasts.second.second); + connection_->sendPendingSelectiveSubscription(this, + selectiveBroadcasts.first, + std::get<0>(selectiveBroadcasts.second), + std::get<1>(selectiveBroadcasts.second), + std::get<2>(selectiveBroadcasts.second)); } } } else { @@ -365,9 +420,11 @@ void DBusProxy::onDBusServiceInstanceStatus(const AvailabilityStatus& availabili } void DBusProxy::insertSelectiveSubscription(const std::string& interfaceMemberName, - DBusProxyConnection::DBusSignalHandler* dbusSignalHandler, uint32_t tag) { + DBusProxyConnection::DBusSignalHandler* dbusSignalHandler, uint32_t tag, + std::string interfaceMemberSignature) { std::lock_guard < std::mutex > queueLock(selectiveBroadcastHandlersMutex_); - selectiveBroadcastHandlers[interfaceMemberName] = std::make_pair(dbusSignalHandler, tag); + selectiveBroadcastHandlers[interfaceMemberName] = std::make_tuple( + dbusSignalHandler, tag, interfaceMemberSignature); } void DBusProxy::subscribeForSelectiveBroadcastOnConnection( @@ -585,6 +642,5 @@ void DBusProxy::freeDesktopGetCurrentValueForSignalListener( } } - } // namespace DBus } // namespace CommonAPI diff --git a/src/CommonAPI/DBus/DBusProxyManager.cpp b/src/CommonAPI/DBus/DBusProxyManager.cpp index e36f6e1..cedd58b 100644 --- a/src/CommonAPI/DBus/DBusProxyManager.cpp +++ b/src/CommonAPI/DBus/DBusProxyManager.cpp @@ -12,11 +12,12 @@ namespace DBus { DBusProxyManager::DBusProxyManager( DBusProxy &_proxy, - const std::string &_interfaceId) + const std::string &_dbusInterfaceId, + const std::string &_capiInterfaceId) : proxy_(_proxy), - instanceAvailabilityStatusEvent_(_proxy, _interfaceId), - interfaceId_(_interfaceId), - registry_(DBusServiceRegistry::get(_proxy.getDBusConnection())) + instanceAvailabilityStatusEvent_(_proxy, _dbusInterfaceId, _capiInterfaceId), + dbusInterfaceId_(_dbusInterfaceId), + capiInterfaceId_(_capiInterfaceId) { } @@ -28,7 +29,7 @@ DBusProxyManager::getDomain() const { const std::string & DBusProxyManager::getInterface() const { - return interfaceId_; + return capiInterfaceId_; } const ConnectionId_t & @@ -40,14 +41,16 @@ DBusProxyManager::getConnectionId() const { void DBusProxyManager::instancesAsyncCallback( + std::shared_ptr<Proxy> _proxy, const CommonAPI::CallStatus &_status, - const DBusObjectManagerStub::DBusObjectPathAndInterfacesDict &_dict, + const std::vector<DBusAddress> &_availableServiceInstances, GetAvailableInstancesCallback &_call) { - std::vector<std::string> result; + (void)_proxy; + std::vector<std::string> itsAvailableInstances; if (_status == CommonAPI::CallStatus::SUCCESS) { - translateCommonApiAddresses(_dict, result); + translate(_availableServiceInstances, itsAvailableInstances); } - _call(_status, result); + _call(_status, itsAvailableInstances); } void @@ -55,98 +58,38 @@ DBusProxyManager::getAvailableInstances( CommonAPI::CallStatus &_status, std::vector<std::string> &_availableInstances) { _availableInstances.clear(); - DBusObjectManagerStub::DBusObjectPathAndInterfacesDict dict; - - DBusProxyHelper< - DBusSerializableArguments<>, - DBusSerializableArguments< - DBusObjectManagerStub::DBusObjectPathAndInterfacesDict - > - >::callMethodWithReply(proxy_, - DBusObjectManagerStub::getInterfaceName(), - "GetManagedObjects", - "", - &defaultCallInfo, - _status, - dict); - - if (_status == CallStatus::SUCCESS) { - translateCommonApiAddresses(dict, _availableInstances); - } + std::vector<DBusAddress> itsAvailableServiceInstances; + instanceAvailabilityStatusEvent_.getAvailableServiceInstances(_status, itsAvailableServiceInstances); + translate(itsAvailableServiceInstances, _availableInstances); } std::future<CallStatus> DBusProxyManager::getAvailableInstancesAsync( GetAvailableInstancesCallback _callback) { - return CommonAPI::DBus::DBusProxyHelper< - CommonAPI::DBus::DBusSerializableArguments<>, - CommonAPI::DBus::DBusSerializableArguments< - DBusObjectManagerStub::DBusObjectPathAndInterfacesDict - > - >::callMethodAsync( - proxy_, - DBusObjectManagerStub::getInterfaceName(), - "GetManagedObjects", - "", - &defaultCallInfo, - std::move( - std::bind( - &DBusProxyManager::instancesAsyncCallback, - this, - std::placeholders::_1, std::placeholders::_2, - _callback - ) - ), - std::tuple<DBusObjectManagerStub::DBusObjectPathAndInterfacesDict>()); + return instanceAvailabilityStatusEvent_.getAvailableServiceInstancesAsync(std::bind( + &DBusProxyManager::instancesAsyncCallback, + this, + proxy_.shared_from_this(), + std::placeholders::_1, + std::placeholders::_2, + _callback)); } void DBusProxyManager::getInstanceAvailabilityStatus( - const std::string &_address, + const std::string &_instance, CallStatus &_callStatus, AvailabilityStatus &_availabilityStatus) { - - CommonAPI::Address itsAddress("local", interfaceId_, _address); - DBusAddress itsDBusAddress; - DBusAddressTranslator::get()->translate(itsAddress, itsDBusAddress); - - _availabilityStatus = AvailabilityStatus::NOT_AVAILABLE; - if (registry_->isServiceInstanceAlive( - itsDBusAddress.getInterface(), - itsDBusAddress.getService(), - itsDBusAddress.getObjectPath())) { - _availabilityStatus = AvailabilityStatus::AVAILABLE; - } - _callStatus = CallStatus::SUCCESS; -} - -void -DBusProxyManager::instanceAliveAsyncCallback( - const AvailabilityStatus &_alive, - GetInstanceAvailabilityStatusCallback &_call, - std::shared_ptr<std::promise<CallStatus> > &_status) { - _call(CallStatus::SUCCESS, _alive); - _status->set_value(CallStatus::SUCCESS); + instanceAvailabilityStatusEvent_.getServiceInstanceAvailabilityStatus(_instance, + _callStatus, + _availabilityStatus); } std::future<CallStatus> DBusProxyManager::getInstanceAvailabilityStatusAsync( const std::string &_instance, GetInstanceAvailabilityStatusCallback _callback) { - - CommonAPI::Address itsAddress("local", interfaceId_, _instance); - - std::shared_ptr<std::promise<CallStatus> > promise = std::make_shared<std::promise<CallStatus>>(); - registry_->subscribeAvailabilityListener( - itsAddress.getAddress(), - std::bind(&DBusProxyManager::instanceAliveAsyncCallback, - this, - std::placeholders::_1, - _callback, - promise) - ); - - return promise->get_future(); + return instanceAvailabilityStatusEvent_.getServiceInstanceAvailabilityStatusAsync(_instance, _callback); } DBusProxyManager::InstanceAvailabilityStatusChangedEvent & @@ -154,31 +97,12 @@ DBusProxyManager::getInstanceAvailabilityStatusChangedEvent() { return instanceAvailabilityStatusEvent_; } -void -DBusProxyManager::translateCommonApiAddresses( - const DBusObjectManagerStub::DBusObjectPathAndInterfacesDict &_dict, - std::vector<std::string> &_instances) { - - CommonAPI::Address itsAddress; - DBusAddress itsDBusAddress; - - // get service information from proxy - const std::string &_service = proxy_.getDBusAddress().getService(); - itsDBusAddress.setService(_service); - - for (const auto &objectPathIter : _dict) { - itsDBusAddress.setObjectPath(objectPathIter.first); - - const auto &interfacesDict = objectPathIter.second; - for (const auto &interfaceIter : interfacesDict) { - - // return only those addresses whose interface matches with ours - if (interfaceIter.first == interfaceId_) { - itsDBusAddress.setInterface(interfaceIter.first); - DBusAddressTranslator::get()->translate(itsDBusAddress, itsAddress); - _instances.push_back(itsAddress.getInstance()); - } - } +void DBusProxyManager::translate(const std::vector<DBusAddress> &_serviceInstances, + std::vector<std::string> &_instances) { + CommonAPI::Address itsCapiAddress; + for(auto itsDbusAddress : _serviceInstances) { + DBusAddressTranslator::get()->translate(itsDbusAddress, itsCapiAddress); + _instances.push_back(itsCapiAddress.getInstance()); } } diff --git a/src/CommonAPI/DBus/DBusServiceRegistry.cpp b/src/CommonAPI/DBus/DBusServiceRegistry.cpp index 65290fc..d345a9d 100644 --- a/src/CommonAPI/DBus/DBusServiceRegistry.cpp +++ b/src/CommonAPI/DBus/DBusServiceRegistry.cpp @@ -46,8 +46,6 @@ DBusServiceRegistry::remove(std::shared_ptr<DBusProxyConnection> _connection) { DBusServiceRegistry::DBusServiceRegistry(std::shared_ptr<DBusProxyConnection> dbusProxyConnection) : dbusDaemonProxy_(std::make_shared<CommonAPI::DBus::DBusDaemonProxy>(dbusProxyConnection)), initialized_(false), - servicesToResolve(0), - objectPathsToResolve(0), notificationThread_() { } @@ -57,7 +55,6 @@ DBusServiceRegistry::~DBusServiceRegistry() { } dbusDaemonProxy_->getNameOwnerChangedEvent().unsubscribe(dbusDaemonProxyNameOwnerChangedEventSubscription_); - dbusDaemonProxy_->getProxyStatusEvent().unsubscribe(dbusDaemonProxyStatusEventSubscription_); // notify only listeners of resolved services (online > offline) for (auto& dbusServiceListenersIterator : dbusServiceListenersMap) { @@ -85,10 +82,6 @@ DBusServiceRegistry::~DBusServiceRegistry() { void DBusServiceRegistry::init() { translator_ = DBusAddressTranslator::get(); - dbusDaemonProxyStatusEventSubscription_ = - dbusDaemonProxy_->getProxyStatusEvent().subscribe( - std::bind(&DBusServiceRegistry::onDBusDaemonProxyStatusEvent, this, std::placeholders::_1)); - dbusDaemonProxyNameOwnerChangedEventSubscription_ = dbusDaemonProxy_->getNameOwnerChangedEvent().subscribe( std::bind(&DBusServiceRegistry::onDBusDaemonProxyNameOwnerChangedEvent, @@ -173,6 +166,8 @@ DBusServiceRegistry::subscribeAvailabilityListener( } dbusInterfaceNameListenersRecord.listenerList.push_front(std::move(serviceListener)); + dbusInterfaceNameListenersRecord.listenersToRemove.remove( + dbusInterfaceNameListenersRecord.listenerList.begin()); dbusServicesMutex_.unlock(); @@ -217,15 +212,8 @@ DBusServiceRegistry::unsubscribeAvailabilityListener( auto& dbusInterfaceNameListenersRecord = dbusInterfaceNameListenersIterator->second; - dbusInterfaceNameListenersRecord.listenerList.erase(listenerSubscription); - - if (dbusInterfaceNameListenersRecord.listenerList.empty()) { - dbusInterfaceNameListenersMap.erase(dbusInterfaceNameListenersIterator); - - if (dbusInterfaceNameListenersMap.empty()) { - dbusServiceListenersRecord.dbusObjectPathListenersMap.erase(dbusObjectPathListenersIterator); - } - } + // mark listener to remove + dbusInterfaceNameListenersRecord.listenersToRemove.push_back(listenerSubscription); dbusServicesMutex_.unlock(); } @@ -234,6 +222,7 @@ DBusServiceRegistry::unsubscribeAvailabilityListener( bool DBusServiceRegistry::isServiceInstanceAlive(const std::string& dbusInterfaceName, const std::string& dbusServiceName, const std::string& dbusObjectPath) { + bool result = false; std::chrono::milliseconds timeout(1000); bool uniqueNameFound = false; @@ -267,9 +256,8 @@ bool DBusServiceRegistry::isServiceInstanceAlive(const std::string& dbusInterfac std::shared_future<DBusRecordState> futureNameResolved = insertedDbusServiceListenerRecord.first->second.futureOnResolve; futureNameResolved.wait_for(timeout); - if(futureNameResolved.get() != DBusRecordState::RESOLVED) { + if(futureNameResolved.get() != DBusRecordState::RESOLVED) return false; - } dbusServicesMutex_.lock(); auto dbusServiceListenersMapIterator = dbusServiceListenersMap.find(dbusServiceName); @@ -296,68 +284,73 @@ bool DBusServiceRegistry::isServiceInstanceAlive(const std::string& dbusInterfac dbusUniqueNameRecord = &dbusUniqueNameRecordIterator->second; } - dbusServicesMutex_.unlock(); - if (NULL == dbusUniqueNameRecord) { COMMONAPI_ERROR(std::string(__FUNCTION__), " no unique name record found for IF: ", dbusInterfaceName, " service: ", dbusServiceName, "object path: ", dbusObjectPath); } - auto& dbusObjectPathsCache = dbusUniqueNameRecord->dbusObjectPathsCache; - auto dbusObjectPathCacheIterator = dbusObjectPathsCache.find(dbusObjectPath); + if(dbusPredefinedServices_.find(dbusServiceName) == dbusPredefinedServices_.end()) { - DBusObjectPathCache* dbusObjectPathCache = NULL; + auto& dbusObjectPathsCache = dbusUniqueNameRecord->dbusObjectPathsCache; + auto dbusObjectPathCacheIterator = dbusObjectPathsCache.find(dbusObjectPath); - if(dbusObjectPathCacheIterator != dbusObjectPathsCache.end()) { - dbusObjectPathCache = &(dbusObjectPathCacheIterator->second); - if (dbusObjectPathCache->state != DBusRecordState::RESOLVED) { - dbusObjectPathCache->state = DBusRecordState::RESOLVING; - dbusServicesMutex_.lock(); + DBusObjectPathCache* dbusObjectPathCache = NULL; + if(dbusObjectPathCacheIterator != dbusObjectPathsCache.end()) { dbusObjectPathCache = &(dbusObjectPathCacheIterator->second); + if (dbusObjectPathCache->state != DBusRecordState::RESOLVED) { + dbusObjectPathCache->state = DBusRecordState::RESOLVING; - std::future<DBusRecordState> futureObjectPathResolved = dbusObjectPathCache->promiseOnResolve.get_future(); - dbusServicesMutex_.unlock(); + dbusObjectPathCache = &(dbusObjectPathCacheIterator->second); - introspectDBusObjectPath(uniqueName, dbusObjectPath); - futureObjectPathResolved.wait_for(timeout); + std::future<DBusRecordState> futureObjectPathResolved = dbusObjectPathCache->promiseOnResolve.get_future(); + dbusServicesMutex_.unlock(); + + resolveObjectPathWithObjectManager(uniqueName, dbusObjectPath); + futureObjectPathResolved.wait_for(timeout); + } else { + dbusServicesMutex_.unlock(); + } } - } - else { - // try to resolve object paths - DBusObjectPathCache newDbusObjectPathCache; - newDbusObjectPathCache.state = DBusRecordState::RESOLVING; - newDbusObjectPathCache.serviceName = dbusServiceName; + else { + // try to resolve object paths + DBusObjectPathCache newDbusObjectPathCache; + newDbusObjectPathCache.state = DBusRecordState::RESOLVING; + newDbusObjectPathCache.serviceName = dbusServiceName; - dbusServicesMutex_.lock(); + dbusObjectPathsCache.insert(std::make_pair(dbusObjectPath, std::move(newDbusObjectPathCache))); - dbusObjectPathsCache.insert(std::make_pair(dbusObjectPath, std::move(newDbusObjectPathCache))); + dbusObjectPathCacheIterator = dbusObjectPathsCache.find(dbusObjectPath); - dbusObjectPathCacheIterator = dbusObjectPathsCache.find(dbusObjectPath); + dbusObjectPathCache = &(dbusObjectPathCacheIterator->second); - dbusObjectPathCache = &(dbusObjectPathCacheIterator->second); + newDbusObjectPathCache.futureOnResolve = dbusObjectPathCache->promiseOnResolve.get_future(); + dbusServicesMutex_.unlock(); - newDbusObjectPathCache.futureOnResolve = dbusObjectPathCache->promiseOnResolve.get_future(); - dbusServicesMutex_.unlock(); + resolveObjectPathWithObjectManager(uniqueName, dbusObjectPath); + newDbusObjectPathCache.futureOnResolve.wait_for(timeout); + } - introspectDBusObjectPath(uniqueName, dbusObjectPath); - newDbusObjectPathCache.futureOnResolve.wait_for(timeout); - } + if (NULL == dbusObjectPathCache) { + COMMONAPI_ERROR(std::string(__FUNCTION__), " no object path cache entry found for IF: ", dbusInterfaceName, + " service: ", dbusServiceName, "object path: ", dbusObjectPath); + } - if (NULL == dbusObjectPathCache) { - COMMONAPI_ERROR(std::string(__FUNCTION__), " no object path cache entry found for IF: ", dbusInterfaceName, - " service: ", dbusServiceName, "object path: ", dbusObjectPath); - } + dbusServicesMutex_.lock(); + if(dbusObjectPathCache->state != DBusRecordState::RESOLVED) { + dbusServicesMutex_.unlock(); + return false; + } - dbusServicesMutex_.lock(); - if(dbusObjectPathCache->state != DBusRecordState::RESOLVED) { + auto dbusInterfaceNamesIterator = dbusObjectPathCache->dbusInterfaceNamesCache.find(dbusInterfaceName); + result = dbusInterfaceNamesIterator != dbusObjectPathCache->dbusInterfaceNamesCache.end(); dbusServicesMutex_.unlock(); - return false; - } - auto dbusInterfaceNamesIterator = dbusObjectPathCache->dbusInterfaceNamesCache.find(dbusInterfaceName); - bool result = dbusInterfaceNamesIterator != dbusObjectPathCache->dbusInterfaceNamesCache.end(); - dbusServicesMutex_.unlock(); + } else { + //service is predefined + result = true; + dbusServicesMutex_.unlock(); + } return(result); } @@ -372,6 +365,7 @@ void DBusServiceRegistry::fetchAllServiceNames() { dbusDaemonProxy_->listNames(callStatus, availableServiceNames); + dbusServicesMutex_.lock(); if (callStatus == CallStatus::SUCCESS) { for(std::string serviceName : availableServiceNames) { if(isDBusServiceName(serviceName)) { @@ -379,114 +373,27 @@ void DBusServiceRegistry::fetchAllServiceNames() { } } } + dbusServicesMutex_.unlock(); } -// d-feet mode -std::vector<std::string> DBusServiceRegistry::getAvailableServiceInstances(const std::string& interfaceName, - const std::string& domainName) { - (void)domainName; - std::vector<std::string> availableServiceInstances; - - // resolve all service names - for (auto serviceNameIterator = dbusServiceNameMap_.begin(); - serviceNameIterator != dbusServiceNameMap_.end(); - serviceNameIterator++) { - - std::string serviceName = serviceNameIterator->first; - DBusUniqueNameRecord* dbusUniqueNameRecord = serviceNameIterator->second; - - if(dbusUniqueNameRecord == NULL) { - DBusServiceListenersRecord& serviceListenerRecord = dbusServiceListenersMap[serviceName]; - if(serviceListenerRecord.uniqueBusNameState != DBusRecordState::RESOLVING) { - resolveDBusServiceName(serviceName, serviceListenerRecord); - } - } - } - - std::mutex mutexResolveAllServices; - std::unique_lock<std::mutex> lockResolveAllServices(mutexResolveAllServices); - std::chrono::milliseconds timeout(5000); - - monitorResolveAllServices_.wait_for(lockResolveAllServices, timeout, [&] { - mutexServiceResolveCount.lock(); - bool finished = servicesToResolve == 0; - mutexServiceResolveCount.unlock(); - - return finished; - }); - - for (auto serviceNameIterator = dbusServiceNameMap_.begin(); - serviceNameIterator != dbusServiceNameMap_.end(); - serviceNameIterator++) { - - std::string serviceName = serviceNameIterator->first; - DBusUniqueNameRecord* dbusUniqueNameRecord = serviceNameIterator->second; - - if(dbusUniqueNameRecord != NULL) { - if(dbusUniqueNameRecord->objectPathsState == DBusRecordState::UNKNOWN) { - DBusObjectPathCache& rootObjectPathCache = dbusUniqueNameRecord->dbusObjectPathsCache["/"]; - if(rootObjectPathCache.state == DBusRecordState::UNKNOWN) { - rootObjectPathCache.state = DBusRecordState::RESOLVING; - introspectDBusObjectPath(dbusUniqueNameRecord->uniqueName, "/"); - } - } - } - } - - std::mutex mutexResolveAllObjectPaths; - std::unique_lock<std::mutex> lockResolveAllObjectPaths(mutexResolveAllObjectPaths); - - // TODO: Check if should use the remaining timeout not "used" during wait before - monitorResolveAllObjectPaths_.wait_for(lockResolveAllObjectPaths, timeout, [&] { - mutexServiceResolveCount.lock(); - bool finished = objectPathsToResolve == 0; - mutexServiceResolveCount.unlock(); - - return finished; - }); - - for (auto serviceNameIterator = dbusServiceNameMap_.begin(); - serviceNameIterator != dbusServiceNameMap_.end(); - serviceNameIterator++) { - - std::string serviceName = serviceNameIterator->first; - DBusUniqueNameRecord* dbusUniqueNameRecord = serviceNameIterator->second; - - if(dbusUniqueNameRecord != NULL) { - if(dbusUniqueNameRecord->objectPathsState == DBusRecordState::RESOLVED) { - for (auto dbusObjectPathCacheIterator = dbusUniqueNameRecord->dbusObjectPathsCache.begin(); - dbusObjectPathCacheIterator != dbusUniqueNameRecord->dbusObjectPathsCache.end(); - dbusObjectPathCacheIterator++) { - if (dbusObjectPathCacheIterator->second.state == DBusRecordState::RESOLVED) { - if (dbusObjectPathCacheIterator->second.dbusInterfaceNamesCache.find(interfaceName) - != dbusObjectPathCacheIterator->second.dbusInterfaceNamesCache.end()) { - std::string commonApiAddress; - translator_->translate( - dbusObjectPathCacheIterator->first + "/" + interfaceName + "/" + serviceName, commonApiAddress); - availableServiceInstances.push_back(commonApiAddress); - } - } - } - } - } - } - - // maybe partial list but it contains everything we know for now - return availableServiceInstances; +void DBusServiceRegistry::getAvailableServiceInstances(const std::string& dbusServiceName, + const std::string& dbusObjectPath, + DBusObjectManagerStub::DBusObjectPathAndInterfacesDict& availableServiceInstances) { + getManagedObjects(dbusServiceName, dbusObjectPath, availableServiceInstances); } -void DBusServiceRegistry::getAvailableServiceInstancesAsync(CommonAPI::Factory::AvailableInstancesCbk_t _cbk, - const std::string &_interface, - const std::string &_domain) { - //Necessary as service discovery might need some time, but the async version of "getAvailableServiceInstances" - //shall return without delay. - std::thread( - [this, _cbk, _interface, _domain](std::shared_ptr<DBusServiceRegistry> selfRef) { - (void)selfRef; - auto instances = getAvailableServiceInstances(_interface, _domain); - _cbk(instances); - }, this->shared_from_this() - ).detach(); +void DBusServiceRegistry::getAvailableServiceInstancesAsync(GetAvailableServiceInstancesCallback callback, + const std::string& dbusServiceName, + const std::string& dbusObjectPath) { + getManagedObjectsAsync(dbusServiceName, dbusObjectPath, [callback](const CallStatus& callStatus, + const DBusObjectManagerStub::DBusObjectPathAndInterfacesDict availableServiceInstances, + const std::string& dbusServiceName, + const std::string& dbusObjectPath) { + (void)callStatus; + (void)dbusServiceName; + (void)dbusObjectPath; + callback(availableServiceInstances); + }); } void DBusServiceRegistry::onSignalDBusMessage(const DBusMessage &_dbusMessage) { @@ -498,7 +405,8 @@ void DBusServiceRegistry::onSignalDBusMessage(const DBusMessage &_dbusMessage) { if (!_dbusMessage.hasInterfaceName("org.freedesktop.DBus.ObjectManager")) { COMMONAPI_ERROR(std::string(__FUNCTION__), " unexpected interface ", _dbusMessage.getInterface()); } - if (!_dbusMessage.hasMemberName("InterfacesAdded") && !_dbusMessage.hasMemberName("InterfacesAdded")) { + if (!_dbusMessage.hasMemberName("InterfacesAdded") && !_dbusMessage.hasMemberName("InterfacesAdded") && + !_dbusMessage.hasMemberName("InterfacesRemoved") && !_dbusMessage.hasMemberName("InterfacesRemoved") ) { COMMONAPI_ERROR(std::string(__FUNCTION__), " unexpected member ", _dbusMessage.getMember()); } @@ -598,10 +506,6 @@ void DBusServiceRegistry::resolveDBusServiceName(const std::string& dbusServiceN COMMONAPI_ERROR(std::string(__FUNCTION__), " unique name not empty ", dbusServiceListenersRecord.uniqueBusName); } - mutexServiceResolveCount.lock(); - servicesToResolve++; - mutexServiceResolveCount.unlock(); - if (dbusDaemonProxy_->isAvailable()) { auto func = std::bind( @@ -648,11 +552,6 @@ void DBusServiceRegistry::onGetNameOwnerCallback(const CallStatus& status, onDBusServiceNotAvailable(dbusServiceListenersRecord, dbusServiceName); } - mutexServiceResolveCount.lock(); - servicesToResolve--; - mutexServiceResolveCount.unlock(); - monitorResolveAllServices_.notify_all(); - dbusServicesMutex_.unlock(); } @@ -788,28 +687,51 @@ bool DBusServiceRegistry::resolveObjectPathWithObjectManager(const std::string& std::placeholders::_2, dbusServiceUniqueName, dbusObjectPath); - return getManagedObjects(dbusServiceUniqueName, "/", getManagedObjectsCallback); + return getManagedObjectsAsync(dbusServiceUniqueName, "/", getManagedObjectsCallback); } -bool DBusServiceRegistry::getManagedObjects(const std::string& dbusServiceUniqueName, - const std::string& dbusObjectPath, - GetManagedObjectsCallback callback) { - bool isSendingInProgress = false; +bool DBusServiceRegistry::getManagedObjects(const std::string& dbusServiceName, + const std::string& dbusObjectPath, + DBusObjectManagerStub::DBusObjectPathAndInterfacesDict& availableServiceInstances) { auto dbusConnection = dbusDaemonProxy_->getDBusConnection(); - if (dbusServiceUniqueName.empty()) { - COMMONAPI_ERROR(std::string(__FUNCTION__), " dbusServiceUniqueName empty"); + if (dbusServiceName.empty()) { + COMMONAPI_ERROR(std::string(__FUNCTION__), " dbusServiceName empty"); } if(dbusConnection->isConnected()) { - if(dbusObjectPath != "/") { - mutexObjectPathsResolveCount.lock(); - objectPathsToResolve++; - mutexObjectPathsResolveCount.unlock(); - } + DBusAddress dbusAddress(dbusServiceName, dbusObjectPath, "org.freedesktop.DBus.ObjectManager"); + DBusMessage dbusMessageCall = CommonAPI::DBus::DBusMessage::createMethodCall( + dbusAddress, + "GetManagedObjects"); + + DBusError error; + CallInfo* defaultCallInfo = new CallInfo(); + DBusMessage reply = dbusConnection->sendDBusMessageWithReplyAndBlock(dbusMessageCall, error, defaultCallInfo); + delete defaultCallInfo; + + DBusInputStream input(reply); + if (!DBusSerializableArguments<DBusObjectManagerStub::DBusObjectPathAndInterfacesDict>::deserialize( + input, availableServiceInstances) || error) + return false; + } + return true; +} + +bool DBusServiceRegistry::getManagedObjectsAsync(const std::string& dbusServiceName, + const std::string& dbusObjectPath, + GetManagedObjectsCallback callback) { + bool isSendingInProgress = false; + auto dbusConnection = dbusDaemonProxy_->getDBusConnection(); + + if (dbusServiceName.empty()) { + COMMONAPI_ERROR(std::string(__FUNCTION__), " dbusServiceName empty"); + } + + if(dbusConnection->isConnected()) { - DBusAddress dbusAddress(dbusServiceUniqueName, dbusObjectPath, "org.freedesktop.DBus.ObjectManager"); + DBusAddress dbusAddress(dbusServiceName, dbusObjectPath, "org.freedesktop.DBus.ObjectManager"); DBusMessage dbusMessageCall = CommonAPI::DBus::DBusMessage::createMethodCall( dbusAddress, "GetManagedObjects"); @@ -818,7 +740,7 @@ bool DBusServiceRegistry::getManagedObjects(const std::string& dbusServiceUnique callback, std::placeholders::_1, std::placeholders::_2, - dbusServiceUniqueName, + dbusServiceName, dbusObjectPath); DBusProxyAsyncCallbackHandler< @@ -840,14 +762,14 @@ bool DBusServiceRegistry::getManagedObjects(const std::string& dbusServiceUnique } void DBusServiceRegistry::onGetManagedObjectsCallbackResolve(const CallStatus& callStatus, - const DBusObjectManagerStub::DBusObjectPathAndInterfacesDict dbusObjectPathAndInterfacesDict, + const DBusObjectManagerStub::DBusObjectPathAndInterfacesDict availableServiceInstances, const std::string& dbusServiceUniqueName, const std::string& dbusObjectPath) { if(callStatus == CallStatus::SUCCESS) { //has object manager bool objectPathFound = false; - for(auto objectPathDict : dbusObjectPathAndInterfacesDict) + for(auto objectPathDict : availableServiceInstances) { std::string objectPath = objectPathDict.first; if(objectPath != dbusObjectPath) @@ -863,16 +785,6 @@ void DBusServiceRegistry::onGetManagedObjectsCallbackResolve(const CallStatus& c processManagedObject(dbusObjectPath, dbusServiceUniqueName, interfaceName); dbusServicesMutex_.unlock(); } - - // resolve further managed objects - auto callback = std::bind( - &DBusServiceRegistry::onGetManagedObjectsCallbackResolveFurther, - this->shared_from_this(), - std::placeholders::_1, - std::placeholders::_2, - std::placeholders::_3, - std::placeholders::_4); - getManagedObjects(dbusServiceUniqueName, dbusObjectPath, callback); } if(!objectPathFound) { @@ -885,193 +797,41 @@ void DBusServiceRegistry::onGetManagedObjectsCallbackResolve(const CallStatus& c dbusServiceUniqueName, dbusObjectPath); std::string objectPathManager = dbusObjectPath.substr(0, dbusObjectPath.find_last_of("\\/")); - getManagedObjects(dbusServiceUniqueName, objectPathManager, getManagedObjectsCallback); + getManagedObjectsAsync(dbusServiceUniqueName, objectPathManager, getManagedObjectsCallback); } } else { COMMONAPI_ERROR("There is no Object Manager that manages " + dbusObjectPath + ". Resolving failed!"); } } -void DBusServiceRegistry::onGetManagedObjectsCallbackResolveFurther(const CallStatus& callStatus, - const DBusObjectManagerStub::DBusObjectPathAndInterfacesDict dbusObjectPathAndInterfacesDict, - const std::string& dbusServiceUniqueName, - const std::string& dbusObjectPath) { - - if(callStatus == CallStatus::SUCCESS) { - for(auto objectPathDict : dbusObjectPathAndInterfacesDict) - { - //resolve - std::string objectPath = objectPathDict.first; - CommonAPI::DBus::DBusObjectManagerStub::DBusInterfacesAndPropertiesDict interfacesAndPropertiesDict = objectPathDict.second; - for(auto interfaceDict : interfacesAndPropertiesDict) - { - std::string interfaceName = interfaceDict.first; - dbusServicesMutex_.lock(); - processManagedObject(objectPath, dbusServiceUniqueName, interfaceName); - dbusServicesMutex_.unlock(); - } - - // resolve further managed objects - auto callback = std::bind( - &DBusServiceRegistry::onGetManagedObjectsCallbackResolveFurther, - this->shared_from_this(), - std::placeholders::_1, - std::placeholders::_2, - std::placeholders::_3, - std::placeholders::_4); - getManagedObjects(dbusServiceUniqueName, objectPath, callback); - } - } else { - // No further managed objects - } - - dbusServicesMutex_.lock(); +void DBusServiceRegistry::processManagedObject(const std::string& dbusObjectPath, + const std::string& dbusServiceUniqueName, + const std::string& interfaceName) { auto dbusServiceUniqueNameIterator = dbusUniqueNamesMap_.find(dbusServiceUniqueName); const bool isDBusServiceUniqueNameFound = (dbusServiceUniqueNameIterator != dbusUniqueNamesMap_.end()); - if (!isDBusServiceUniqueNameFound) { - dbusServicesMutex_.unlock(); + if (!isDBusServiceUniqueNameFound) return; - } DBusUniqueNameRecord& dbusUniqueNameRecord = dbusServiceUniqueNameIterator->second; auto dbusObjectPathIterator = dbusUniqueNameRecord.dbusObjectPathsCache.find(dbusObjectPath); const bool isDBusObjectPathFound = (dbusObjectPathIterator != dbusUniqueNameRecord.dbusObjectPathsCache.end()); - if (!isDBusObjectPathFound) { - dbusServicesMutex_.unlock(); + if (!isDBusObjectPathFound) return; - } DBusObjectPathCache& dbusObjectPathRecord = dbusObjectPathIterator->second; - dbusObjectPathRecord.state = DBusRecordState::RESOLVED; - if(dbusObjectPathRecord.futureOnResolve.valid()) { - dbusObjectPathRecord.promiseOnResolve.set_value(dbusObjectPathRecord.state); - } - mutexObjectPathsResolveCount.lock(); - objectPathsToResolve--; - mutexObjectPathsResolveCount.unlock(); - monitorResolveAllObjectPaths_.notify_all(); - - dbusUniqueNameRecord.objectPathsState = DBusRecordState::RESOLVED; - - notifyDBusServiceListeners( - dbusUniqueNameRecord, - dbusObjectPath, - dbusObjectPathRecord.dbusInterfaceNamesCache, - DBusRecordState::RESOLVED); - - dbusServicesMutex_.unlock(); -} - -void DBusServiceRegistry::processManagedObject(const std::string& dbusObjectPath, - const std::string& dbusServiceUniqueName, - const std::string& interfaceName) { - DBusUniqueNameRecord& dbusUniqueNameRecord = dbusUniqueNamesMap_[dbusServiceUniqueName]; - DBusObjectPathCache& dbusObjectPathCache = dbusUniqueNameRecord.dbusObjectPathsCache[dbusObjectPath]; - if(!isOrgFreedesktopDBusInterface(interfaceName)) { - dbusObjectPathCache.dbusInterfaceNamesCache.insert(interfaceName); + dbusObjectPathRecord.dbusInterfaceNamesCache.insert(interfaceName); } else if (translator_->isOrgFreedesktopDBusPeerMapped() && (interfaceName == "org.freedesktop.DBus.Peer")) { - dbusObjectPathCache.dbusInterfaceNamesCache.insert(interfaceName); - } -} - -bool DBusServiceRegistry::introspectDBusObjectPath(const std::string& dbusServiceUniqueName, - const std::string& dbusObjectPath) { - bool isResolvingInProgress = false; - auto dbusConnection = dbusDaemonProxy_->getDBusConnection(); - - if (dbusServiceUniqueName.empty()) { - COMMONAPI_ERROR(std::string(__FUNCTION__), " dbusServiceUniqueName empty"); + dbusObjectPathRecord.dbusInterfaceNamesCache.insert(interfaceName); } - if (dbusConnection->isConnected()) { - mutexObjectPathsResolveCount.lock(); - objectPathsToResolve++; - mutexObjectPathsResolveCount.unlock(); - - DBusAddress dbusAddress(dbusServiceUniqueName, dbusObjectPath, "org.freedesktop.DBus.Introspectable"); - DBusMessage dbusMessageCall = DBusMessage::createMethodCall( - dbusAddress, - "Introspect"); - auto instrospectAsyncCallback = std::bind( - &DBusServiceRegistry::onIntrospectCallback, - this->shared_from_this(), - std::placeholders::_1, - std::placeholders::_2, - dbusServiceUniqueName, - dbusObjectPath); - - DBusProxyAsyncCallbackHandler< - DBusServiceRegistry, - std::string - >::Delegate delegate(shared_from_this(), instrospectAsyncCallback); - - dbusConnection->sendDBusMessageWithReplyAsync( - dbusMessageCall, - DBusProxyAsyncCallbackHandler< - DBusServiceRegistry, - std::string - >::create(delegate, std::tuple<std::string>()), - &serviceRegistryInfo); - - isResolvingInProgress = true; - } - return isResolvingInProgress; -} - -/** - * Callback for org.freedesktop.DBus.Introspectable.Introspect - * - * This is the other end of checking if a dbus object path is available. - * On success it'll extract all interface names from the xml data response. - * Special interfaces that start with "org.freedesktop.DBus." will be ignored. - * - * @param status - * @param xmlData - * @param dbusServiceUniqueName - * @param dbusObjectPath - */ -void DBusServiceRegistry::onIntrospectCallback(const CallStatus& callStatus, - std::string xmlData, - const std::string& dbusServiceUniqueName, - const std::string& dbusObjectPath) { - if (callStatus == CallStatus::SUCCESS) { - parseIntrospectionData(xmlData, dbusObjectPath, dbusServiceUniqueName); - } - - dbusServicesMutex_.lock(); - - // Error CallStatus will result in empty parsedDBusInterfaceNameSet (and not available notification) - - auto dbusServiceUniqueNameIterator = dbusUniqueNamesMap_.find(dbusServiceUniqueName); - const bool isDBusServiceUniqueNameFound = (dbusServiceUniqueNameIterator != dbusUniqueNamesMap_.end()); - - if (!isDBusServiceUniqueNameFound) { - dbusServicesMutex_.unlock(); - return; - } - - DBusUniqueNameRecord& dbusUniqueNameRecord = dbusServiceUniqueNameIterator->second; - auto dbusObjectPathIterator = dbusUniqueNameRecord.dbusObjectPathsCache.find(dbusObjectPath); - const bool isDBusObjectPathFound = (dbusObjectPathIterator != dbusUniqueNameRecord.dbusObjectPathsCache.end()); - - if (!isDBusObjectPathFound) { - dbusServicesMutex_.unlock(); - return; - } - - DBusObjectPathCache& dbusObjectPathRecord = dbusObjectPathIterator->second; - dbusObjectPathRecord.state = DBusRecordState::RESOLVED; - dbusObjectPathRecord.promiseOnResolve.set_value(dbusObjectPathRecord.state); - mutexObjectPathsResolveCount.lock(); - objectPathsToResolve--; - mutexObjectPathsResolveCount.unlock(); - monitorResolveAllObjectPaths_.notify_all(); + if(dbusObjectPathRecord.futureOnResolve.valid()) + dbusObjectPathRecord.promiseOnResolve.set_value(dbusObjectPathRecord.state); dbusUniqueNameRecord.objectPathsState = DBusRecordState::RESOLVED; @@ -1080,104 +840,6 @@ void DBusServiceRegistry::onIntrospectCallback(const CallStatus& callStatus, dbusObjectPath, dbusObjectPathRecord.dbusInterfaceNamesCache, DBusRecordState::RESOLVED); - - dbusServicesMutex_.unlock(); -} - -void DBusServiceRegistry::parseIntrospectionNode(const pugi::xml_node& node, const std::string& rootObjectPath, const std::string& fullObjectPath, const std::string& dbusServiceUniqueName) { - std::string nodeName; - - for(pugi::xml_node& subNode : node.children()) { - nodeName = std::string(subNode.name()); - - if(nodeName == "node") { - processIntrospectionObjectPath(subNode, rootObjectPath, dbusServiceUniqueName); - } - - if(nodeName == "interface") { - processIntrospectionInterface(subNode, rootObjectPath, fullObjectPath, dbusServiceUniqueName); - } - } -} - -void DBusServiceRegistry::processIntrospectionObjectPath(const pugi::xml_node& node, const std::string& rootObjectPath, const std::string& dbusServiceUniqueName) { - std::string fullObjectPath = rootObjectPath; - - if(fullObjectPath.at(fullObjectPath.length()-1) != '/') { - fullObjectPath += "/"; - } - - fullObjectPath += std::string(node.attribute("name").as_string()); - - DBusUniqueNameRecord& dbusUniqueNameRecord = dbusUniqueNamesMap_[dbusServiceUniqueName]; - DBusObjectPathCache& dbusObjectPathCache = dbusUniqueNameRecord.dbusObjectPathsCache[fullObjectPath]; - - if(dbusObjectPathCache.state == DBusRecordState::UNKNOWN) { - dbusObjectPathCache.state = DBusRecordState::RESOLVING; - introspectDBusObjectPath(dbusServiceUniqueName, fullObjectPath); - } - - for(pugi::xml_node subNode : node.children()) { - parseIntrospectionNode(subNode, fullObjectPath, fullObjectPath, dbusServiceUniqueName); - } -} - -void DBusServiceRegistry::processIntrospectionInterface(const pugi::xml_node& node, const std::string& rootObjectPath, const std::string& fullObjectPath, const std::string& dbusServiceUniqueName) { - std::string interfaceName = node.attribute("name").as_string(); - DBusUniqueNameRecord& dbusUniqueNameRecord = dbusUniqueNamesMap_[dbusServiceUniqueName]; - DBusObjectPathCache& dbusObjectPathCache = dbusUniqueNameRecord.dbusObjectPathsCache[fullObjectPath]; - - if(!isOrgFreedesktopDBusInterface(interfaceName)) { - dbusObjectPathCache.dbusInterfaceNamesCache.insert(interfaceName); - } else if (translator_->isOrgFreedesktopDBusPeerMapped() && (interfaceName == "org.freedesktop.DBus.Peer")) { - dbusObjectPathCache.dbusInterfaceNamesCache.insert(interfaceName); - } - - for(pugi::xml_node subNode : node.children()) { - parseIntrospectionNode(subNode, rootObjectPath, fullObjectPath, dbusServiceUniqueName); - } -} - -void DBusServiceRegistry::parseIntrospectionData(const std::string& xmlData, - const std::string& rootObjectPath, - const std::string& dbusServiceUniqueName) { - pugi::xml_document xmlDocument; - pugi::xml_parse_result parsedResult = xmlDocument.load_buffer(xmlData.c_str(), xmlData.length(), pugi::parse_minimal, pugi::encoding_utf8); - - if(parsedResult.status != pugi::xml_parse_status::status_ok) { - return; - } - - const pugi::xml_node rootNode = xmlDocument.child("node"); - - dbusServicesMutex_.lock(); - - parseIntrospectionNode(rootNode, rootObjectPath, rootObjectPath, dbusServiceUniqueName); - - dbusUniqueNamesMap_[dbusServiceUniqueName]; - dbusServicesMutex_.unlock(); -} - - -void DBusServiceRegistry::onDBusDaemonProxyStatusEvent(const AvailabilityStatus& availabilityStatus) { - if (availabilityStatus == AvailabilityStatus::UNKNOWN) { - COMMONAPI_ERROR(std::string(__FUNCTION__), " unexpected availability status ", int(availabilityStatus)); - } - - dbusServicesMutex_.lock(); - - for (auto& dbusServiceListenersIterator : dbusServiceListenersMap) { - const auto& dbusServiceName = dbusServiceListenersIterator.first; - auto& dbusServiceListenersRecord = dbusServiceListenersIterator.second; - - if (availabilityStatus == AvailabilityStatus::AVAILABLE) { - resolveDBusServiceName(dbusServiceName, dbusServiceListenersRecord); - } else { - onDBusServiceNotAvailable(dbusServiceListenersRecord, dbusServiceName); - } - } - - dbusServicesMutex_.unlock(); } void DBusServiceRegistry::checkDBusServiceWasAvailable(const std::string& dbusServiceName, @@ -1378,6 +1040,9 @@ void DBusServiceRegistry::notifyDBusObjectPathChanged(DBusInterfaceNameListeners auto& dbusInterfaceNameListenersRecord = dbusInterfaceNameListenersIterator->second; notifyDBusInterfaceNameListeners(dbusInterfaceNameListenersRecord, isDBusInterfaceNameAvailable); + + if (dbusInterfaceNameListenersRecord.listenerList.empty()) + dbusInterfaceNameListenersMap.erase(dbusInterfaceNameListenersIterator); } } } @@ -1394,10 +1059,20 @@ void DBusServiceRegistry::notifyDBusInterfaceNameListeners(DBusInterfaceNameList } dbusInterfaceNameListenersRecord.state = notifyState; - for (auto dbusServiceListenerIterator = dbusInterfaceNameListenersRecord.listenerList.begin(); - dbusServiceListenerIterator != dbusInterfaceNameListenersRecord.listenerList.end(); - dbusServiceListenerIterator++) { - (*dbusServiceListenerIterator)(availabilityStatus); + auto dbusServiceListenerIterator = dbusInterfaceNameListenersRecord.listenerList.begin(); + while (dbusServiceListenerIterator != dbusInterfaceNameListenersRecord.listenerList.end()) { + + auto itsRemoveListenerIt = std::find(dbusInterfaceNameListenersRecord.listenersToRemove.begin(), + dbusInterfaceNameListenersRecord.listenersToRemove.end(), + dbusServiceListenerIterator); + + if(itsRemoveListenerIt != dbusInterfaceNameListenersRecord.listenersToRemove.end()) { + dbusInterfaceNameListenersRecord.listenersToRemove.remove(dbusServiceListenerIterator); + dbusServiceListenerIterator = dbusInterfaceNameListenersRecord.listenerList.erase(dbusServiceListenerIterator); + } else { + (*dbusServiceListenerIterator)(availabilityStatus); + ++dbusServiceListenerIterator; + } } } diff --git a/src/CommonAPI/DBus/DBusStubAdapter.cpp b/src/CommonAPI/DBus/DBusStubAdapter.cpp index 422a419..0f86cc1 100644 --- a/src/CommonAPI/DBus/DBusStubAdapter.cpp +++ b/src/CommonAPI/DBus/DBusStubAdapter.cpp @@ -16,12 +16,11 @@ DBusStubAdapter::DBusStubAdapter(const DBusAddress &_dbusAddress, : dbusAddress_(_dbusAddress), connection_(_connection), isManaging_(_isManaging) { - Factory::get()->incrementConnection(connection_); } DBusStubAdapter::~DBusStubAdapter() { deinit(); - Factory::get()->decrementConnection(connection_); + Factory::get()->unregisterStub(address_.getDomain(), address_.getInterface(), address_.getInstance()); } void DBusStubAdapter::init(std::shared_ptr<DBusStubAdapter> _instance) { |