diff options
Diffstat (limited to 'src/CommonAPI/DBus/DBusConnection.cpp')
-rw-r--r-- | src/CommonAPI/DBus/DBusConnection.cpp | 301 |
1 files changed, 219 insertions, 82 deletions
diff --git a/src/CommonAPI/DBus/DBusConnection.cpp b/src/CommonAPI/DBus/DBusConnection.cpp index 0ce70bc..003aea9 100644 --- a/src/CommonAPI/DBus/DBusConnection.cpp +++ b/src/CommonAPI/DBus/DBusConnection.cpp @@ -16,7 +16,6 @@ #include <CommonAPI/DBus/DBusProxy.hpp> #include <CommonAPI/DBus/DBusAddressTranslator.hpp> - namespace CommonAPI { namespace DBus { @@ -46,6 +45,93 @@ void DBusConnectionStatusEvent::onListenerAdded(const Listener &_listener, const _listener(AvailabilityStatus::AVAILABLE); } +// Helper class to ensure that CommonAPI::Runtime (static instance) is not destroyed unless last +// connection is closed. Also tries to safely join dispatch threads (~CompletionHelper will be +// executed in main thread's context on program exit), in case DBusConnection::disconnect() has +// been invoked from dispatch thread itself, thus "self-joining" would have raised an exception. +class CompletionHelper { +public: + + static std::unique_ptr<CompletionHelper> & get() { + static std::unique_ptr<CompletionHelper> theCompleter = std::unique_ptr<CompletionHelper>(new CompletionHelper()); + return theCompleter; + } + + ~CompletionHelper() { + destructing_ = true; + + std::set<std::uintptr_t>::size_type activeConnections(connections_.max_size()); + { + std::lock_guard<std::mutex> lock(mutex_); + activeConnections = connections_.size(); + } + + bool forceDetach(false); + if (0u != activeConnections) { + std::future<bool> ready = readyToCleanup_.get_future(); + if (ready.valid()) { + const std::future_status status = ready.wait_for(std::chrono::seconds(1)); + forceDetach = (std::future_status::ready != status); + } + } + + { + std::lock_guard<std::mutex> lock(mutex_); + for (std::thread * p : threads_) { + if (nullptr != p) { + if (!forceDetach && p->joinable()) { + p->join(); + } else { + p->detach(); + } + delete p; + } + } + } + }; + + void registerConnection(std::uintptr_t conn) { + std::lock_guard<std::mutex> lock(mutex_); + connections_.insert(conn); + } + + void unregisterConnection(std::uintptr_t conn) { + std::lock_guard<std::mutex> lock(mutex_); + if ((1u == connections_.erase(conn)) && (0u == connections_.size()) && destructing_) { + readyToCleanup_.set_value(true); + } + } + + void joinOnExit(std::uintptr_t conn, std::thread & t) { + std::lock_guard<std::mutex> lock(mutex_); + if (connections_.end() != connections_.find(conn)) { + bool found(false); + std::vector<std::thread*>::const_iterator it = threads_.begin(); + while (!found && it != threads_.end()) { + found = (&t == *it++); + } + if (!found) { + threads_.push_back(&t); + } + } + } + +private: + + CompletionHelper() + : destructing_(false) { + } + + CompletionHelper(CompletionHelper const &) = delete; + void operator= (CompletionHelper const &) = delete; + + std::mutex mutex_; + std::vector<std::thread *> threads_; + std::set<std::uintptr_t> connections_; + + std::promise<bool> readyToCleanup_; + std::atomic<bool> destructing_; +}; const DBusObjectPathVTable* DBusConnection::getDBusObjectPathVTable() { static const DBusObjectPathVTable libdbusObjectPathVTable = { @@ -130,57 +216,6 @@ DBusConnection::~DBusConnection() { delete watchContext_; delete timeoutContext_; } - - //Assert that the enforcerThread_ is in a position to finish itself correctly even after destruction - //of the DBusConnection. Also assert all resources are cleaned up. - auto it = timeoutMap_.begin(); - while (it != timeoutMap_.end()) { - DBusMessageReplyAsyncHandler* asyncHandler = std::get<1>(it->second); - DBusPendingCall* libdbusPendingCall = it->first; - - asyncHandler->lock(); - bool executionStarted = asyncHandler->getExecutionStarted(); - bool executionFinished = asyncHandler->getExecutionFinished(); - if (executionStarted && !executionFinished) { - asyncHandler->setHasToBeDeleted(); - it = timeoutMap_.erase(it); - asyncHandler->unlock(); - continue; - } - if (!executionStarted && !executionFinished && !dbus_pending_call_get_completed(libdbusPendingCall)) { - dbus_pending_call_cancel(libdbusPendingCall); - } - asyncHandler->unlock(); - - if (!executionStarted && !executionFinished) { - DBusMessage& dbusMessageCall = std::get<2>(it->second); - - asyncHandler->onDBusMessageReply(CallStatus::REMOTE_ERROR, dbusMessageCall.createMethodError(DBUS_ERROR_TIMEOUT)); - - dbus_pending_call_unref(libdbusPendingCall); - } - it = timeoutMap_.erase(it); - delete asyncHandler; - } - - auto itTimeoutInf = timeoutInfiniteAsyncHandlers_.begin(); - while (itTimeoutInf != timeoutInfiniteAsyncHandlers_.end()) { - DBusMessageReplyAsyncHandler* asyncHandler = (*itTimeoutInf); - - asyncHandler->lock(); - bool executionStarted = asyncHandler->getExecutionStarted(); - bool executionFinished = asyncHandler->getExecutionFinished(); - if (executionStarted && !executionFinished) { - asyncHandler->setHasToBeDeleted(); - itTimeoutInf = timeoutInfiniteAsyncHandlers_.erase(itTimeoutInf); - asyncHandler->unlock(); - continue; - } - asyncHandler->unlock(); - - itTimeoutInf = timeoutInfiniteAsyncHandlers_.erase(itTimeoutInf); - delete asyncHandler; - } } bool DBusConnection::attachMainLoopContext(std::weak_ptr<MainLoopContext> mainLoopContext) { @@ -402,8 +437,7 @@ bool DBusConnection::connect(DBusError &dbusError, bool startDispatchThread) { initLibdbusSignalFilterAfterConnect(); - enforcerThread_ = std::make_shared<std::thread>( - std::bind(&DBusConnection::enforceAsynchronousTimeouts, shared_from_this())); + enforcerThread_ = new std::thread(std::bind(&DBusConnection::enforceAsynchronousTimeouts, shared_from_this())); dbusConnectionStatusEvent_.notifyListeners(AvailabilityStatus::AVAILABLE); @@ -415,6 +449,11 @@ bool DBusConnection::connect(DBusError &dbusError, bool startDispatchThread) { dispatchThread_ = new std::thread(std::bind(&DBusConnection::dispatch, shared_from_this())); } + std::unique_ptr<CompletionHelper> & helper = CompletionHelper::get(); + if (helper) { + helper->registerConnection(reinterpret_cast<std::uintptr_t>(this)); + } + return true; } @@ -452,16 +491,21 @@ void DBusConnection::disconnect() { dbus_connection_close(connection_); + std::unique_ptr<CompletionHelper> & helper = CompletionHelper::get(); + if(dispatchThread_) { loop_->stop(); //It is possible for the disconnect to be called from within a callback, i.e. from within the dispatch //thread. Self-join is prevented this way. if (dispatchThread_->joinable() && std::this_thread::get_id() != dispatchThread_->get_id()) { dispatchThread_->join(); + delete dispatchThread_; + } else if (helper) { + helper->joinOnExit(reinterpret_cast<std::uintptr_t>(this), *dispatchThread_); } else { dispatchThread_->detach(); + delete dispatchThread_; } - delete dispatchThread_; dispatchThread_ = NULL; } @@ -474,10 +518,17 @@ void DBusConnection::disconnect() { if (enforcerThread_->joinable() && std::this_thread::get_id() != enforcerThread_->get_id()) { enforcerThread_->join(); + delete enforcerThread_; + } else if(helper) { + helper->joinOnExit(reinterpret_cast<std::uintptr_t>(this), *enforcerThread_); } else { enforcerThread_->detach(); } + if (helper) { + helper->unregisterConnection(reinterpret_cast<std::uintptr_t>(this)); + } + // remote mainloop watchers dbus_connection_set_watch_functions(connection_, NULL, NULL, NULL, NULL, NULL); dbus_connection_set_timeout_functions(connection_, NULL, NULL, NULL, NULL, NULL); @@ -599,7 +650,7 @@ void DBusConnection::onLibdbusPendingCall(::DBusPendingCall* _libdbusPendingCall CallStatus callStatus = CallStatus::SUCCESS; if (_reply.isErrorType() || !_reply.isMethodReturnType()) { - if(strcmp(_reply.getError(), DBUS_ERROR_UNKNOWN_METHOD) == 0) { + if (std::string(_reply.getError()) == std::string(DBUS_ERROR_UNKNOWN_METHOD)) { callStatus = CallStatus::NOT_AVAILABLE; } else { callStatus = CallStatus::REMOTE_ERROR; @@ -810,6 +861,47 @@ void DBusConnection::enforceAsynchronousTimeouts() { } } + { + std::lock_guard<std::mutex> itsLock(enforceTimeoutMutex_); + auto it = timeoutMap_.begin(); + while (it != timeoutMap_.end()) { + DBusMessageReplyAsyncHandler* asyncHandler = std::get<1>(it->second); + + asyncHandler->lock(); + bool executionStarted = asyncHandler->getExecutionStarted(); + bool executionFinished = asyncHandler->getExecutionFinished(); + if (executionStarted && !executionFinished) { + asyncHandler->setHasToBeDeleted(); + it = timeoutMap_.erase(it); + asyncHandler->unlock(); + continue; + } + asyncHandler->unlock(); + + it = timeoutMap_.erase(it); + asyncHandlersToDelete_.push_back(asyncHandler); + } + + auto itTimeoutInf = timeoutInfiniteAsyncHandlers_.begin(); + while (itTimeoutInf != timeoutInfiniteAsyncHandlers_.end()) { + DBusMessageReplyAsyncHandler* asyncHandler = (*itTimeoutInf); + + asyncHandler->lock(); + bool executionStarted = asyncHandler->getExecutionStarted(); + bool executionFinished = asyncHandler->getExecutionFinished(); + if (executionStarted && !executionFinished) { + asyncHandler->setHasToBeDeleted(); + itTimeoutInf = timeoutInfiniteAsyncHandlers_.erase(itTimeoutInf); + asyncHandler->unlock(); + continue; + } + asyncHandler->unlock(); + + itTimeoutInf = timeoutInfiniteAsyncHandlers_.erase(itTimeoutInf); + asyncHandlersToDelete_.push_back(asyncHandler); + } + } + // delete left async handlers that could not be deleted by the main loop deleteAsyncHandlers(); } @@ -1159,7 +1251,7 @@ DBusProxyConnection::DBusSignalHandlerToken DBusConnection::addSignalMemberHandl if(signalHandlerPathToAddIt == dbusSignalHandlersToAdd_.end()) { // is first signal member handler for this 'dbusSignalHandlerPath' --> add - std::map<DBusSignalHandler*, std::weak_ptr<DBusSignalHandler>> handlerList; + std::map<const DBusSignalHandler*, std::weak_ptr<DBusSignalHandler>> handlerList; handlerList[itsHandler.get()] = dbusSignalHandler; dbusSignalHandlersToAdd_.insert( { @@ -1210,7 +1302,7 @@ bool DBusConnection::removeSignalMemberHandler(const DBusSignalHandlerToken &dbu //check if signal handler is already added auto signalHandlerPathIt = dbusSignalHandlers_.find(dbusSignalHandlerToken); if(signalHandlerPathIt != dbusSignalHandlers_.end() && - signalHandlerPathIt->second.find(const_cast<DBusSignalHandler*>(dbusSignalHandler)) != + signalHandlerPathIt->second.find(dbusSignalHandler) != signalHandlerPathIt->second.end()) { // signal handler is already added @@ -1218,19 +1310,19 @@ bool DBusConnection::removeSignalMemberHandler(const DBusSignalHandlerToken &dbu auto signalHandlerPathToRemoveIt = dbusSignalHandlersToRemove_.find(dbusSignalHandlerToken); if(signalHandlerPathToRemoveIt != dbusSignalHandlersToRemove_.end()) { - auto it = signalHandlerPathToRemoveIt->second.find(const_cast<DBusSignalHandler*>(dbusSignalHandler)); + auto it = signalHandlerPathToRemoveIt->second.find(dbusSignalHandler); if(it == signalHandlerPathToRemoveIt->second.end()) { // handler is not going to be removed yet --> remove - signalHandlerPathToRemoveIt->second[const_cast<DBusSignalHandler*>(dbusSignalHandler)] = + signalHandlerPathToRemoveIt->second[dbusSignalHandler] = std::weak_ptr<DBusSignalHandler>(); } } else { // handler is not going to be removed yet. No dbus signal handler token found --> insert with handler to remove - std::map<DBusSignalHandler*, std::weak_ptr<DBusSignalHandler>> handlerList; - handlerList[const_cast<DBusSignalHandler*>(dbusSignalHandler)] = + std::map<const DBusSignalHandler*, std::weak_ptr<DBusSignalHandler>> handlerList; + handlerList[dbusSignalHandler] = std::weak_ptr<DBusSignalHandler>(); dbusSignalHandlersToRemove_.insert( { @@ -1245,7 +1337,7 @@ bool DBusConnection::removeSignalMemberHandler(const DBusSignalHandlerToken &dbu auto signalHandlerPathToAddIt = dbusSignalHandlersToAdd_.find(dbusSignalHandlerToken); if(signalHandlerPathToAddIt != dbusSignalHandlersToAdd_.end()) { - auto handlersToAddEntry = signalHandlerPathToAddIt->second.find(const_cast<DBusSignalHandler*>(dbusSignalHandler)); + auto handlersToAddEntry = signalHandlerPathToAddIt->second.find(dbusSignalHandler); if(handlersToAddEntry != signalHandlerPathToAddIt->second.end()) { // handler is planned to be added --> erase @@ -1287,7 +1379,7 @@ bool DBusConnection::addObjectManagerSignalMemberHandler(const std::string& dbus if(signalHandlerPathToAddIt == dbusOMSignalHandlersToAdd_.end()) { // is first signal member handler --> add to list and add match rule - std::pair<DBusSignalHandler*, std::weak_ptr<DBusSignalHandler>> handler = + std::pair<const DBusSignalHandler*, std::weak_ptr<DBusSignalHandler>> handler = std::make_pair(itsHandler.get(), dbusSignalHandler); dbusOMSignalHandlersToAdd_.insert( { @@ -1321,7 +1413,7 @@ bool DBusConnection::addObjectManagerSignalMemberHandler(const std::string& dbus } bool DBusConnection::removeObjectManagerSignalMemberHandler(const std::string& dbusBusName, - DBusSignalHandler* dbusSignalHandler) { + const DBusSignalHandler* dbusSignalHandler) { if (dbusBusName.empty()) { COMMONAPI_ERROR(std::string(__FUNCTION__), " empty dbusBusName"); return false; @@ -1346,7 +1438,7 @@ bool DBusConnection::removeObjectManagerSignalMemberHandler(const std::string& d } } else { // no dbus signal handler found for 'dbusBusName' --> insert with handler - std::pair<DBusSignalHandler*, std::weak_ptr<DBusSignalHandler>> handler = + std::pair<const DBusSignalHandler*, std::weak_ptr<DBusSignalHandler>> handler = std::make_pair(dbusSignalHandler, std::weak_ptr<DBusSignalHandler>()); dbusOMSignalHandlersToRemove_.insert( { @@ -1651,23 +1743,23 @@ void DBusConnection::removeLibdbusSignalMatchRule(const std::string& objectPath, if (!matchRuleFound) { COMMONAPI_ERROR(std::string(__FUNCTION__), " no match rule found for path: ", objectPath, "interface: ", interfaceName, " member: ", interfaceMemberName); - } - - uint32_t& matchRuleReferenceCount = matchRuleIterator->second.first; - if (matchRuleReferenceCount > 1) { - matchRuleReferenceCount--; - return; - } + } else { + uint32_t& matchRuleReferenceCount = matchRuleIterator->second.first; + if (matchRuleReferenceCount > 1) { + matchRuleReferenceCount--; + return; + } - if (isConnected()) { - const std::string& matchRuleString = matchRuleIterator->second.second; - const bool libdbusSuccess = removeLibdbusSignalMatchRule(matchRuleString); - if (!libdbusSuccess) { - COMMONAPI_ERROR(std::string(__FUNCTION__), " removeLibdbusSignalMatchRule failed ", matchRuleString); + if (isConnected()) { + const std::string& matchRuleString = matchRuleIterator->second.second; + const bool libdbusSuccess = removeLibdbusSignalMatchRule(matchRuleString); + if (!libdbusSuccess) { + COMMONAPI_ERROR(std::string(__FUNCTION__), " removeLibdbusSignalMatchRule failed ", matchRuleString); + } } - } - dbusSignalMatchRulesMap_.erase(matchRuleIterator); + dbusSignalMatchRulesMap_.erase(matchRuleIterator); + } } void DBusConnection::initLibdbusObjectPathHandlerAfterConnect() { @@ -1787,7 +1879,7 @@ void DBusConnection::notifyDBusSignalHandlers(DBusSignalHandlerPath handlerPath, auto signalHandlerPathIt = dbusSignalHandlers_.find(handlerPath); if(signalHandlerPathIt == dbusSignalHandlers_.end()) { - std::map<DBusSignalHandler*, std::weak_ptr<DBusSignalHandler>> handlerList; + std::map<const DBusSignalHandler*, std::weak_ptr<DBusSignalHandler>> handlerList; handlerList[handlerToAddIt->first] = handlerToAddIt->second; dbusSignalHandlers_.insert( { @@ -1803,7 +1895,7 @@ void DBusConnection::notifyDBusSignalHandlers(DBusSignalHandlerPath handlerPath, } // ensure, the registry survives - std::shared_ptr<DBusServiceRegistry> itsRegistry_ = DBusServiceRegistry::get(shared_from_this()); + std::shared_ptr<DBusServiceRegistry> itsRegistry_ = DBusServiceRegistry::get(shared_from_this(), false); // notify auto signalHandlerPathIt = dbusSignalHandlers_.find(handlerPath); @@ -2041,5 +2133,50 @@ uint32_t DBusConnection::getNumberOfSignalMemberHandlers(DBusSignalHandlerPath h return handlers - handlersToRemove + handlersToAdd; } +void DBusConnection::addSignalStateHandler( + std::shared_ptr<DBusProxyConnection::DBusSignalHandler> _handler, + const uint32_t _subscription) { + { + std::lock_guard<std::mutex> itsLock(signalStateHandlersMutex_); + signalStateHandlers_[_handler].insert(_subscription); + } + auto function = std::bind(&DBusProxyConnection::DBusSignalHandler::onSpecificError, + _handler, std::placeholders::_1, std::placeholders::_2); + + proxyPushFunctionToMainLoop(function, CommonAPI::CallStatus::SUCCESS, _subscription); +} + +void DBusConnection::removeSignalStateHandler( + std::shared_ptr<DBusProxyConnection::DBusSignalHandler> _handler, + const uint32_t _tag, bool _remove_all) { + { + std::lock_guard<std::mutex> itsLock(signalStateHandlersMutex_); + if (_remove_all) { + signalStateHandlers_.erase(_handler); + } else { + auto itsHandler = signalStateHandlers_.find(_handler); + if (itsHandler != signalStateHandlers_.end()) { + itsHandler->second.erase(_tag); + if (itsHandler->second.size() == 0) { + signalStateHandlers_.erase(_handler); + } + } + } + } +} + +void DBusConnection::handleSignalStates() { + std::map<std::shared_ptr<DBusProxyConnection::DBusSignalHandler>, std::set<uint32_t>> tmpHandlers; + { + std::lock_guard<std::mutex> itsLock(signalStateHandlersMutex_); + tmpHandlers = signalStateHandlers_; + } + for (auto itsHandler : tmpHandlers) { + for (uint32_t tag : itsHandler.second) { + itsHandler.first->onSpecificError(CommonAPI::CallStatus::SUCCESS, tag); + } + } +} + } // namespace DBus } // namespace CommonAPI |