diff options
Diffstat (limited to 'src/CommonAPI/DBus')
-rw-r--r-- | src/CommonAPI/DBus/DBusConnection.cpp | 168 | ||||
-rw-r--r-- | src/CommonAPI/DBus/DBusFunctionalHash.cpp | 36 | ||||
-rw-r--r-- | src/CommonAPI/DBus/DBusInstanceAvailabilityStatusChangedEvent.cpp | 49 | ||||
-rwxr-xr-x | src/CommonAPI/DBus/DBusMainLoop.cpp | 86 | ||||
-rw-r--r-- | src/CommonAPI/DBus/DBusMainLoopContext.cpp | 240 | ||||
-rw-r--r-- | src/CommonAPI/DBus/DBusMessage.cpp | 4 | ||||
-rw-r--r-- | src/CommonAPI/DBus/DBusProxy.cpp | 59 | ||||
-rw-r--r-- | src/CommonAPI/DBus/DBusProxyBase.cpp | 7 | ||||
-rw-r--r-- | src/CommonAPI/DBus/DBusServiceRegistry.cpp | 73 |
9 files changed, 405 insertions, 317 deletions
diff --git a/src/CommonAPI/DBus/DBusConnection.cpp b/src/CommonAPI/DBus/DBusConnection.cpp index 23967a9..f34ff70 100644 --- a/src/CommonAPI/DBus/DBusConnection.cpp +++ b/src/CommonAPI/DBus/DBusConnection.cpp @@ -124,19 +124,13 @@ DBusConnection::~DBusConnection() { dbus_connection_set_timeout_functions(connection_, NULL, NULL, NULL, NULL, NULL); } - lockedContext->deregisterWatch(queueWatch_); lockedContext->deregisterDispatchSource(queueDispatchSource_); lockedContext->deregisterDispatchSource(dispatchSource_); + lockedContext->deregisterWatch(queueWatch_); delete watchContext_; delete timeoutContext_; } - // ensure, the registry survives until disconnecting is done... - //std::shared_ptr<DBusServiceRegistry> itsRegistry = DBusServiceRegistry::get(shared_from_this()); - - // Disconnecting not possible because of circular dependency, the destructor will be called AFTER disconnect anyway. - //disconnect(); - //Assert that the enforcerThread_ is in a position to finish itself correctly even after destruction //of the DBusConnection. Also assert all resources are cleaned up. auto it = timeoutMap_.begin(); @@ -916,11 +910,6 @@ DBusMessage DBusConnection::sendDBusMessageWithReplyAndBlock(const DBusMessage& COMMONAPI_DEBUG("Message sent: SenderID: ", _info->sender_, " - Serial number: ", dbusMessage.getSerial()); } - if (dbusError) { - COMMONAPI_ERROR(std::string(__FUNCTION__), "dbusError set"); - return DBusMessage(); - } - const bool increaseLibdbusMessageReferenceCount = false; return DBusMessage(libdbusMessageReply, increaseLibdbusMessageReferenceCount); } @@ -1020,8 +1009,11 @@ bool DBusConnection::setDispatching(bool _isDispatching) { } } -void DBusConnection::sendPendingSelectiveSubscription(DBusProxy* callingProxy, std::string interfaceMemberName, - DBusSignalHandler* dbusSignalHandler, uint32_t tag, std::string interfaceMemberSignature) { +void DBusConnection::sendPendingSelectiveSubscription(DBusProxy* callingProxy, + std::string interfaceMemberName, + std::weak_ptr<DBusSignalHandler> dbusSignalHandler, + uint32_t tag, + std::string interfaceMemberSignature) { bool outarg; std::string methodName = "subscribeFor" + interfaceMemberName + "Selective"; DBusProxyHelper<CommonAPI::DBus::DBusSerializableArguments<>, @@ -1032,14 +1024,17 @@ void DBusConnection::sendPendingSelectiveSubscription(DBusProxy* callingProxy, s (const CommonAPI::CallStatus& callStatus, const bool& accepted) { if (callStatus == CommonAPI::CallStatus::SUCCESS && accepted) { - dbusSignalHandler->onSpecificError(CommonAPI::CallStatus::SUCCESS, tag); + if(auto itsHandler = dbusSignalHandler.lock()) + itsHandler->onSpecificError(CommonAPI::CallStatus::SUCCESS, tag); } else { const DBusSignalHandlerPath itsToken(callingProxy->getDBusAddress().getObjectPath(), callingProxy->getDBusAddress().getInterface(), interfaceMemberName, interfaceMemberSignature); - removeSignalMemberHandler(itsToken, dbusSignalHandler); - dbusSignalHandler->onSpecificError(CommonAPI::CallStatus::SUBSCRIPTION_REFUSED, tag); + if(auto itsHandler = dbusSignalHandler.lock()) { + removeSignalMemberHandler(itsToken, itsHandler.get()); + itsHandler->onSpecificError(CommonAPI::CallStatus::SUBSCRIPTION_REFUSED, tag); + } } }, std::make_tuple(outarg)); } @@ -1049,43 +1044,50 @@ void DBusConnection::subscribeForSelectiveBroadcast( const std::string& interfaceName, const std::string& interfaceMemberName, const std::string& interfaceMemberSignature, - DBusSignalHandler* dbusSignalHandler, + std::weak_ptr<DBusSignalHandler> dbusSignalHandler, DBusProxy* callingProxy, uint32_t tag) { - std::string methodName = "subscribeFor" + interfaceMemberName + "Selective"; - - DBusProxyConnection::DBusSignalHandlerToken token = addSignalMemberHandler( - objectPath, - interfaceName, - interfaceMemberName, - interfaceMemberSignature, - dbusSignalHandler, - true - ); - dbusSignalHandler->setSubscriptionToken(token, tag); - - bool outarg; - if(callingProxy->isAvailable()) { - DBusProxyHelper<CommonAPI::DBus::DBusSerializableArguments<>, - CommonAPI::DBus::DBusSerializableArguments<bool>>::callMethodAsync( - *callingProxy, methodName.c_str(), "", - &CommonAPI::DBus::defaultCallInfo, - [this, objectPath, interfaceName, interfaceMemberName, interfaceMemberSignature, dbusSignalHandler, callingProxy, tag, token] - (const CommonAPI::CallStatus& callStatus, const bool& accepted) { - (void)callStatus; - if (accepted) { - dbusSignalHandler->onSpecificError(CommonAPI::CallStatus::SUCCESS, tag); - } else { - removeSignalMemberHandler(token, dbusSignalHandler); - dbusSignalHandler->onSpecificError(CommonAPI::CallStatus::SUBSCRIPTION_REFUSED, tag); - } - }, std::make_tuple(outarg)); + if(auto itsHandler = dbusSignalHandler.lock()) { + + std::string methodName = "subscribeFor" + interfaceMemberName + "Selective"; + + DBusSignalHandlerToken token = addSignalMemberHandler( + objectPath, + interfaceName, + interfaceMemberName, + interfaceMemberSignature, + dbusSignalHandler, + true + ); + + itsHandler->setSubscriptionToken(token, tag); + + bool outarg; + if(callingProxy->isAvailable()) { + DBusProxyHelper<CommonAPI::DBus::DBusSerializableArguments<>, + CommonAPI::DBus::DBusSerializableArguments<bool>>::callMethodAsync( + *callingProxy, methodName.c_str(), "", + &CommonAPI::DBus::defaultCallInfo, + [this, objectPath, interfaceName, interfaceMemberName, interfaceMemberSignature, dbusSignalHandler, callingProxy, tag, token] + (const CommonAPI::CallStatus& callStatus, const bool& accepted) { + (void)callStatus; + if (accepted) { + if(auto itsHandler = dbusSignalHandler.lock()) + itsHandler->onSpecificError(CommonAPI::CallStatus::SUCCESS, tag); + } else { + if(auto itsHandler = dbusSignalHandler.lock()) { + removeSignalMemberHandler(token, itsHandler.get()); + itsHandler->onSpecificError(CommonAPI::CallStatus::SUBSCRIPTION_REFUSED, tag); + } + } + }, std::make_tuple(outarg)); + } } } void DBusConnection::unsubscribeFromSelectiveBroadcast(const std::string& eventName, - DBusProxyConnection::DBusSignalHandlerToken subscription, + DBusSignalHandlerToken subscription, DBusProxy* callingProxy, const DBusSignalHandler* dbusSignalHandler) { bool lastListenerOnConnectionRemoved = removeSignalMemberHandler(subscription, dbusSignalHandler); @@ -1104,7 +1106,7 @@ DBusProxyConnection::DBusSignalHandlerToken DBusConnection::addSignalMemberHandl const std::string& interfaceName, const std::string& interfaceMemberName, const std::string& interfaceMemberSignature, - DBusSignalHandler* dbusSignalHandler, + std::weak_ptr<DBusSignalHandler> dbusSignalHandler, const bool justAddFilter) { DBusSignalHandlerPath dbusSignalHandlerPath( objectPath, @@ -1115,18 +1117,20 @@ DBusProxyConnection::DBusSignalHandlerToken DBusConnection::addSignalMemberHandl auto signalEntry = dbusSignalHandlerTable_.find(dbusSignalHandlerPath); const bool isFirstSignalMemberHandler = (signalEntry == dbusSignalHandlerTable_.end()); - if (isFirstSignalMemberHandler) { + auto itsHandler = dbusSignalHandler.lock(); + if (itsHandler && isFirstSignalMemberHandler) { addLibdbusSignalMatchRule(objectPath, interfaceName, interfaceMemberName, justAddFilter); - std::set<DBusSignalHandler*> handlerList; - handlerList.insert(dbusSignalHandler); + + std::map<DBusSignalHandler*, std::weak_ptr<DBusSignalHandler>> handlerList; + handlerList[itsHandler.get()] = dbusSignalHandler; dbusSignalHandlerTable_.insert( { dbusSignalHandlerPath, std::make_pair(std::make_shared<std::recursive_mutex>(), std::move(handlerList)) } ); - } else { + } else if (itsHandler && !isFirstSignalMemberHandler) { signalEntry->second.first->lock(); - signalEntry->second.second.insert(dbusSignalHandler); + signalEntry->second.second[itsHandler.get()] = dbusSignalHandler; signalEntry->second.first->unlock(); } @@ -1134,7 +1138,7 @@ DBusProxyConnection::DBusSignalHandlerToken DBusConnection::addSignalMemberHandl } bool DBusConnection::removeSignalMemberHandler(const DBusSignalHandlerToken &dbusSignalHandlerToken, - const DBusSignalHandler *dbusSignalHandler) { + const DBusSignalHandler* dbusSignalHandler) { bool lastHandlerRemoved = false; std::lock_guard < std::mutex > dbusSignalLock(signalGuard_); @@ -1161,41 +1165,43 @@ bool DBusConnection::removeSignalMemberHandler(const DBusSignalHandlerToken &dbu } bool DBusConnection::addObjectManagerSignalMemberHandler(const std::string& dbusBusName, - DBusSignalHandler* dbusSignalHandler) { + std::weak_ptr<DBusSignalHandler> dbusSignalHandler) { if (dbusBusName.length() < 2) { return false; } - std::lock_guard<std::mutex> dbusSignalLock(dbusObjectManagerSignalGuard_); + if(auto itsHandler = dbusSignalHandler.lock()) { + std::lock_guard<std::mutex> dbusSignalLock(dbusObjectManagerSignalGuard_); - auto dbusSignalMatchRuleIterator = dbusObjectManagerSignalMatchRulesMap_.find(dbusBusName); - const bool isDBusSignalMatchRuleFound = (dbusSignalMatchRuleIterator != dbusObjectManagerSignalMatchRulesMap_.end()); + auto dbusSignalMatchRuleIterator = dbusObjectManagerSignalMatchRulesMap_.find(dbusBusName); + const bool isDBusSignalMatchRuleFound = (dbusSignalMatchRuleIterator != dbusObjectManagerSignalMatchRulesMap_.end()); - if (!isDBusSignalMatchRuleFound) { - if (isConnected() && !addObjectManagerSignalMatchRule(dbusBusName)) { - return false; - } + if (!isDBusSignalMatchRuleFound) { + if (isConnected() && !addObjectManagerSignalMatchRule(dbusBusName)) { + return false; + } - auto insertResult = dbusObjectManagerSignalMatchRulesMap_.insert({ dbusBusName, 0 }); - const bool isInsertSuccessful = insertResult.second; + auto insertResult = dbusObjectManagerSignalMatchRulesMap_.insert({ dbusBusName, 0 }); + const bool isInsertSuccessful = insertResult.second; - if (!isInsertSuccessful) { - if (isConnected()) { - const bool isRemoveSignalMatchRuleSuccessful = removeObjectManagerSignalMatchRule(dbusBusName); - if (!isRemoveSignalMatchRuleSuccessful) { - COMMONAPI_ERROR(std::string(__FUNCTION__), " removeObjectManagerSignalMatchRule", dbusBusName, " failed"); + if (!isInsertSuccessful) { + if (isConnected()) { + const bool isRemoveSignalMatchRuleSuccessful = removeObjectManagerSignalMatchRule(dbusBusName); + if (!isRemoveSignalMatchRuleSuccessful) { + COMMONAPI_ERROR(std::string(__FUNCTION__), " removeObjectManagerSignalMatchRule", dbusBusName, " failed"); + } } + return false; } - return false; + + dbusSignalMatchRuleIterator = insertResult.first; } - dbusSignalMatchRuleIterator = insertResult.first; + size_t &dbusSignalMatchRuleReferenceCount = dbusSignalMatchRuleIterator->second; + dbusSignalMatchRuleReferenceCount++; + dbusObjectManagerSignalHandlerTable_.insert( { dbusBusName, std::make_pair(itsHandler.get(), dbusSignalHandler) } ); } - size_t &dbusSignalMatchRuleReferenceCount = dbusSignalMatchRuleIterator->second; - dbusSignalMatchRuleReferenceCount++; - dbusObjectManagerSignalHandlerTable_.insert( { dbusBusName, dbusSignalHandler } ); - return true; } @@ -1219,7 +1225,7 @@ bool DBusConnection::removeObjectManagerSignalMemberHandler(const std::string& d auto dbusObjectManagerSignalHandlerIterator = std::find_if( dbusObjectManagerSignalHandlerRange.first, dbusObjectManagerSignalHandlerRange.second, - [&](decltype(*dbusObjectManagerSignalHandlerRange.first)& it) { return it.second == dbusSignalHandler; }); + [&](decltype(*dbusObjectManagerSignalHandlerRange.first)& it) { return it.second.first == dbusSignalHandler; }); const bool isDBusSignalHandlerFound = (dbusObjectManagerSignalHandlerIterator != dbusObjectManagerSignalHandlerRange.second); if (!isDBusSignalHandlerFound) { return false; @@ -1574,8 +1580,9 @@ void notifyDBusSignalHandlers(DBusSignalHandlersTable& dbusSignalHandlerstable, auto handlerEntry = signalEntry->second.second.begin(); while (handlerEntry != signalEntry->second.second.end()) { - DBusProxyConnection::DBusSignalHandler* dbusSignalHandler = *handlerEntry; - dbusSignalHandler->onSignalDBusMessage(dbusMessage); + std::weak_ptr<DBusProxyConnection::DBusSignalHandler> dbusSignalHandler = handlerEntry->second; + if(auto itsHandler = dbusSignalHandler.lock()) + itsHandler->onSignalDBusMessage(dbusMessage); handlerEntry++; } dbusHandlerResult = DBUS_HANDLER_RESULT_HANDLED; @@ -1593,8 +1600,9 @@ void notifyDBusOMSignalHandlers(DBusSignalHandlersTable& dbusSignalHandlerstable dbusHandlerResult = DBUS_HANDLER_RESULT_HANDLED; } while (equalRange.first != equalRange.second) { - DBusProxyConnection::DBusSignalHandler* dbusSignalHandler = equalRange.first->second; - dbusSignalHandler->onSignalDBusMessage(dbusMessage); + std::weak_ptr<DBusProxyConnection::DBusSignalHandler> dbusSignalHandler = equalRange.first->second.second; + if(auto itsHandler = dbusSignalHandler.lock()) + itsHandler->onSignalDBusMessage(dbusMessage); equalRange.first++; } } diff --git a/src/CommonAPI/DBus/DBusFunctionalHash.cpp b/src/CommonAPI/DBus/DBusFunctionalHash.cpp index e82d578..5e11bfc 100644 --- a/src/CommonAPI/DBus/DBusFunctionalHash.cpp +++ b/src/CommonAPI/DBus/DBusFunctionalHash.cpp @@ -33,10 +33,10 @@ size_t hash<pair<const char*, const char*> >::operator()(const pair<const char*, uint32_t seed = static_cast<uint32_t>(SMHASHER_SEED_VALUE); if (NULL != a) { - MurmurHash3_x86_32(a, static_cast<int>(strlen(a)), seed, &seed); + MurmurHash3_x86_32(a, static_cast<unsigned int>(strlen(a)), seed, &seed); } if (NULL != b) { - MurmurHash3_x86_32(b, static_cast<int>(strlen(b)), seed, &seed); + MurmurHash3_x86_32(b, static_cast<unsigned int>(strlen(b)), seed, &seed); } return static_cast<size_t>(seed); @@ -44,7 +44,7 @@ size_t hash<pair<const char*, const char*> >::operator()(const pair<const char*, size_t hash<const char*>::operator()(const char* const t) const { uint32_t seed = static_cast<uint32_t>(SMHASHER_SEED_VALUE); - MurmurHash3_x86_32(t, static_cast<int>(strlen(t)), seed, &seed); + MurmurHash3_x86_32(t, static_cast<unsigned int>(strlen(t)), seed, &seed); return static_cast<size_t>(seed); } @@ -53,8 +53,8 @@ size_t hash<pair<string, string> >::operator()(const pair<string, string>& t) co const string& b = t.second; uint32_t seed = static_cast<uint32_t>(SMHASHER_SEED_VALUE); - MurmurHash3_x86_32(a.c_str(), static_cast<int>(a.length()), seed, &seed); - MurmurHash3_x86_32(b.c_str(), static_cast<int>(b.length()), seed, &seed); + MurmurHash3_x86_32(a.c_str(), static_cast<unsigned int>(a.length()), seed, &seed); + MurmurHash3_x86_32(b.c_str(), static_cast<unsigned int>(b.length()), seed, &seed); return static_cast<size_t>(seed); } @@ -65,9 +65,9 @@ size_t hash<tuple<string, string, string> >::operator()(const tuple<string, stri const string& c = get<2>(t); uint32_t seed = static_cast<uint32_t>(SMHASHER_SEED_VALUE); - MurmurHash3_x86_32(a.c_str(), static_cast<int>(a.length()), seed, &seed); - MurmurHash3_x86_32(b.c_str(), static_cast<int>(b.length()), seed, &seed); - MurmurHash3_x86_32(c.c_str(), static_cast<int>(c.length()), seed, &seed); + MurmurHash3_x86_32(a.c_str(), static_cast<unsigned int>(a.length()), seed, &seed); + MurmurHash3_x86_32(b.c_str(), static_cast<unsigned int>(b.length()), seed, &seed); + MurmurHash3_x86_32(c.c_str(), static_cast<unsigned int>(c.length()), seed, &seed); return static_cast<size_t>(seed); } @@ -79,9 +79,9 @@ size_t hash<tuple<string, string, string, bool> >::operator()(const tuple<string const bool d = get<3>(t); uint32_t seed = static_cast<uint32_t>(SMHASHER_SEED_VALUE); - MurmurHash3_x86_32(a.c_str(), static_cast<int>(a.length()), seed, &seed); - MurmurHash3_x86_32(b.c_str(), static_cast<int>(b.length()), seed, &seed); - MurmurHash3_x86_32(c.c_str(), static_cast<int>(c.length()), seed, &seed); + MurmurHash3_x86_32(a.c_str(), static_cast<unsigned int>(a.length()), seed, &seed); + MurmurHash3_x86_32(b.c_str(), static_cast<unsigned int>(b.length()), seed, &seed); + MurmurHash3_x86_32(c.c_str(), static_cast<unsigned int>(c.length()), seed, &seed); MurmurHash3_x86_32(&d, sizeof(bool), seed, &seed); return static_cast<size_t>(seed); @@ -94,9 +94,9 @@ size_t hash<tuple<string, string, string, int> >::operator()(const tuple<string, const int d = get<3>(t); uint32_t seed = static_cast<uint32_t>(SMHASHER_SEED_VALUE); - MurmurHash3_x86_32(a.c_str(), static_cast<int>(a.length()), seed, &seed); - MurmurHash3_x86_32(b.c_str(), static_cast<int>(b.length()), seed, &seed); - MurmurHash3_x86_32(c.c_str(), static_cast<int>(c.length()), seed, &seed); + MurmurHash3_x86_32(a.c_str(), static_cast<unsigned int>(a.length()), seed, &seed); + MurmurHash3_x86_32(b.c_str(), static_cast<unsigned int>(b.length()), seed, &seed); + MurmurHash3_x86_32(c.c_str(), static_cast<unsigned int>(c.length()), seed, &seed); MurmurHash3_x86_32(&d, sizeof(d), seed, &seed); return static_cast<size_t>(seed); @@ -109,10 +109,10 @@ size_t hash<tuple<string, string, string, string> >::operator()(const tuple<stri const string& d = get<3>(t); uint32_t seed = static_cast<uint32_t>(SMHASHER_SEED_VALUE); - MurmurHash3_x86_32(a.c_str(), static_cast<int>(a.length()), seed, &seed); - MurmurHash3_x86_32(b.c_str(), static_cast<int>(b.length()), seed, &seed); - MurmurHash3_x86_32(c.c_str(), static_cast<int>(c.length()), seed, &seed); - MurmurHash3_x86_32(d.c_str(), static_cast<int>(d.length()), seed, &seed); + MurmurHash3_x86_32(a.c_str(), static_cast<unsigned int>(a.length()), seed, &seed); + MurmurHash3_x86_32(b.c_str(), static_cast<unsigned int>(b.length()), seed, &seed); + MurmurHash3_x86_32(c.c_str(), static_cast<unsigned int>(c.length()), seed, &seed); + MurmurHash3_x86_32(d.c_str(), static_cast<unsigned int>(d.length()), seed, &seed); return static_cast<size_t>(seed); } diff --git a/src/CommonAPI/DBus/DBusInstanceAvailabilityStatusChangedEvent.cpp b/src/CommonAPI/DBus/DBusInstanceAvailabilityStatusChangedEvent.cpp index 93cd84b..a8d538d 100644 --- a/src/CommonAPI/DBus/DBusInstanceAvailabilityStatusChangedEvent.cpp +++ b/src/CommonAPI/DBus/DBusInstanceAvailabilityStatusChangedEvent.cpp @@ -10,10 +10,24 @@ namespace CommonAPI { namespace DBus { +DBusInstanceAvailabilityStatusChangedEvent::SignalHandler::SignalHandler(DBusInstanceAvailabilityStatusChangedEvent* _instanceAvblStatusEvent) : + instanceAvblStatusEvent_(_instanceAvblStatusEvent) { + +} + +void DBusInstanceAvailabilityStatusChangedEvent::SignalHandler::onSignalDBusMessage(const DBusMessage& dbusMessage) { + if (dbusMessage.hasMemberName("InterfacesAdded")) { + instanceAvblStatusEvent_->onInterfacesAddedSignal(dbusMessage); + } else if (dbusMessage.hasMemberName("InterfacesRemoved")) { + instanceAvblStatusEvent_->onInterfacesRemovedSignal(dbusMessage); + } +} + DBusInstanceAvailabilityStatusChangedEvent::DBusInstanceAvailabilityStatusChangedEvent( DBusProxy &_proxy, const std::string &_dbusInterfaceName, const std::string &_capiInterfaceName) : + signalHandler_(std::make_shared<SignalHandler>(this)), proxy_(_proxy), observedCapiInterfaceName_(_capiInterfaceName), registry_(DBusServiceRegistry::get(_proxy.getDBusConnection())) { @@ -25,16 +39,8 @@ DBusInstanceAvailabilityStatusChangedEvent::DBusInstanceAvailabilityStatusChange } DBusInstanceAvailabilityStatusChangedEvent::~DBusInstanceAvailabilityStatusChangedEvent() { - proxy_.removeSignalMemberHandler(interfacesAddedSubscription_, this); - proxy_.removeSignalMemberHandler(interfacesRemovedSubscription_, this); -} - -void DBusInstanceAvailabilityStatusChangedEvent::onSignalDBusMessage(const DBusMessage& dbusMessage) { - if (dbusMessage.hasMemberName("InterfacesAdded")) { - onInterfacesAddedSignal(dbusMessage); - } else if (dbusMessage.hasMemberName("InterfacesRemoved")) { - onInterfacesRemovedSignal(dbusMessage); - } + proxy_.removeSignalMemberHandler(interfacesAddedSubscription_, signalHandler_.get()); + proxy_.removeSignalMemberHandler(interfacesRemovedSubscription_, signalHandler_.get()); } void DBusInstanceAvailabilityStatusChangedEvent::getAvailableServiceInstances( @@ -105,12 +111,14 @@ std::future<CallStatus> DBusInstanceAvailabilityStatusChangedEvent::getServiceIn void DBusInstanceAvailabilityStatusChangedEvent::onFirstListenerAdded(const Listener&) { + proxyWeakPtr_ = proxy_.shared_from_this(); + interfacesAddedSubscription_ = proxy_.addSignalMemberHandler( proxy_.getDBusAddress().getObjectPath(), DBusObjectManagerStub::getInterfaceName(), "InterfacesAdded", "oa{sa{sv}}", - this, + signalHandler_, false); interfacesRemovedSubscription_ = proxy_.addSignalMemberHandler( @@ -118,7 +126,7 @@ void DBusInstanceAvailabilityStatusChangedEvent::onFirstListenerAdded(const List DBusObjectManagerStub::getInterfaceName(), "InterfacesRemoved", "oas", - this, + signalHandler_, false); getAvailableServiceInstancesAsync([&](const CallStatus &_status, @@ -135,8 +143,8 @@ void DBusInstanceAvailabilityStatusChangedEvent::onFirstListenerAdded(const List } void DBusInstanceAvailabilityStatusChangedEvent::onLastListenerRemoved(const Listener&) { - proxy_.removeSignalMemberHandler(interfacesAddedSubscription_, this); - proxy_.removeSignalMemberHandler(interfacesRemovedSubscription_, this); + proxy_.removeSignalMemberHandler(interfacesAddedSubscription_, signalHandler_.get()); + proxy_.removeSignalMemberHandler(interfacesRemovedSubscription_, signalHandler_.get()); } void DBusInstanceAvailabilityStatusChangedEvent::onInterfacesAddedSignal(const DBusMessage &_message) { @@ -158,7 +166,10 @@ void DBusInstanceAvailabilityStatusChangedEvent::onInterfacesAddedSignal(const D if (dbusInputStream.hasError()) { COMMONAPI_ERROR(std::string(__FUNCTION__) + " failed to read interface name"); } - if(dbusInterfaceName == observedDbusInterfaceName_ && addInterface(dbusObjectPath, dbusInterfaceName)) { + if(auto itsProxy = proxyWeakPtr_.lock() && + dbusInterfaceName == observedDbusInterfaceName_ && + addInterface(dbusObjectPath, dbusInterfaceName)) { + (void)itsProxy; notifyInterfaceStatusChanged(dbusObjectPath, dbusInterfaceName, AvailabilityStatus::AVAILABLE); } } @@ -181,7 +192,10 @@ void DBusInstanceAvailabilityStatusChangedEvent::onInterfacesRemovedSignal(const } for (const auto& dbusInterfaceName : dbusInterfaceNames) { - if(dbusInterfaceName == observedDbusInterfaceName_ && removeInterface(dbusObjectPath, dbusInterfaceName)) { + if(auto itsProxy = proxyWeakPtr_.lock() && + dbusInterfaceName == observedDbusInterfaceName_ && + removeInterface(dbusObjectPath, dbusInterfaceName)) { + (void)itsProxy; notifyInterfaceStatusChanged(dbusObjectPath, dbusInterfaceName, AvailabilityStatus::NOT_AVAILABLE); } } @@ -198,9 +212,6 @@ void DBusInstanceAvailabilityStatusChangedEvent::notifyInterfaceStatusChanged( DBusAddressTranslator::get()->translate(itsDBusAddress, itsAddress); - // ensure, the proxy and the event survives until notification is done - auto itsProxy = proxy_.shared_from_this(); - notifyListeners(itsAddress.getAddress(), _availability); } diff --git a/src/CommonAPI/DBus/DBusMainLoop.cpp b/src/CommonAPI/DBus/DBusMainLoop.cpp index b9daea3..5bf27a2 100755 --- a/src/CommonAPI/DBus/DBusMainLoop.cpp +++ b/src/CommonAPI/DBus/DBusMainLoop.cpp @@ -220,7 +220,9 @@ void DBusMainLoop::stop() { void DBusMainLoop::doSingleIteration(const int64_t& timeout) { { - std::lock_guard<std::mutex> itsLock(dispatchSourcesMutex_); + std::lock_guard<std::mutex> itsDispatchSourcesLock(dispatchSourcesMutex_); + std::lock_guard<std::mutex> itsWatchesLock(watchesMutex_); + for (auto dispatchSourceIterator = registeredDispatchSources_.begin(); dispatchSourceIterator != registeredDispatchSources_.end(); dispatchSourceIterator++) { @@ -257,6 +259,43 @@ void DBusMainLoop::doSingleIteration(const int64_t& timeout) { (dispatchSourceIterator->second)->mutex_->unlock(); } } + + for (auto watchesIterator = registeredWatches_.begin(); + watchesIterator != registeredWatches_.end(); + watchesIterator++) { + + (watchesIterator->second)->mutex_->lock(); + if ((watchesIterator->second)->deleteObject_) { + if (!(watchesIterator->second)->isExecuted_) { + (watchesIterator->second)->mutex_->unlock(); + bool contained = false; + for (auto watchesIteratorInner = watchesToDispatch_.begin(); + watchesIteratorInner != watchesToDispatch_.end(); watchesIteratorInner++) { + if (std::get<1>(*watchesIteratorInner)->watch_ == (watchesIterator->second)->watch_) { + contained = true; + break; + } + } + if (!contained) { + delete (watchesIterator->second)->watch_; + (watchesIterator->second)->watch_ = NULL; + delete (watchesIterator->second)->mutex_; + (watchesIterator->second)->mutex_ = NULL; + delete watchesIterator->second; + watchesIterator = registeredWatches_.erase(watchesIterator); + } + if (watchesIterator == registeredWatches_.end()) { + break; + } + } + else { + (watchesIterator->second)->mutex_->unlock(); + } + } + else { + (watchesIterator->second)->mutex_->unlock(); + } + } } { @@ -299,46 +338,6 @@ void DBusMainLoop::doSingleIteration(const int64_t& timeout) { } } - { - std::lock_guard<std::mutex> itsLock(watchesMutex_); - for (auto watchesIterator = registeredWatches_.begin(); - watchesIterator != registeredWatches_.end(); - watchesIterator++) { - - (watchesIterator->second)->mutex_->lock(); - if ((watchesIterator->second)->deleteObject_) { - if (!(watchesIterator->second)->isExecuted_) { - (watchesIterator->second)->mutex_->unlock(); - bool contained = false; - for (auto watchesIteratorInner = watchesToDispatch_.begin(); - watchesIteratorInner != watchesToDispatch_.end(); watchesIteratorInner++) { - if (std::get<1>(*watchesIteratorInner)->watch_ == (watchesIterator->second)->watch_) { - contained = true; - break; - } - } - if (!contained) { - delete (watchesIterator->second)->watch_; - (watchesIterator->second)->watch_ = NULL; - delete (watchesIterator->second)->mutex_; - (watchesIterator->second)->mutex_ = NULL; - delete watchesIterator->second; - watchesIterator = registeredWatches_.erase(watchesIterator); - } - if (watchesIterator == registeredWatches_.end()) { - break; - } - } - else { - (watchesIterator->second)->mutex_->unlock(); - } - } - else { - (watchesIterator->second)->mutex_->unlock(); - } - } - } - if (prepare(timeout)) { dispatch(); } else { @@ -456,7 +455,6 @@ void DBusMainLoop::poll() { } bool DBusMainLoop::check() { - //The first file descriptor always is the loop's wakeup-descriptor (but not for windows anymore). All others need to be linked to a watch. int managedFileDescriptorOffset = 1; { std::lock_guard<std::mutex> itsLock(fileDescriptorsMutex_); @@ -701,10 +699,10 @@ void DBusMainLoop::unregisterDispatchSource(DispatchSource* dispatchSource) { void DBusMainLoop::registerWatch(Watch* watch, const DispatchPriority dispatchPriority) { - std::lock_guard<std::mutex> itsLock(watchesMutex_); DBusMainLoopPollFd fdToRegister = watch->getAssociatedFileDescriptor(); - registerFileDescriptor(fdToRegister); + + std::lock_guard<std::mutex> itsLock(watchesMutex_); std::mutex* mtx = new std::mutex; #ifdef WIN32 std::atomic_signal_fence(std::memory_order_acq_rel); diff --git a/src/CommonAPI/DBus/DBusMainLoopContext.cpp b/src/CommonAPI/DBus/DBusMainLoopContext.cpp index 428807f..3124027 100644 --- a/src/CommonAPI/DBus/DBusMainLoopContext.cpp +++ b/src/CommonAPI/DBus/DBusMainLoopContext.cpp @@ -5,6 +5,7 @@ #ifdef WIN32 #include <WinSock2.h> +#include <ws2tcpip.h> #else #include <poll.h> #include <unistd.h> @@ -181,81 +182,127 @@ void DBusWatch::addDependentDispatchSource(DispatchSource* dispatchSource) { DBusQueueWatch::DBusQueueWatch(std::shared_ptr<DBusConnection> _connection) : pipeValue_(4) { #ifdef WIN32 - std::string pipeName = "\\\\.\\pipe\\CommonAPI-DBus-"; - - UUID uuid; - CHAR* uuidString = NULL; - UuidCreate(&uuid); - UuidToString(&uuid, (RPC_CSTR*)&uuidString); - pipeName += uuidString; - RpcStringFree((RPC_CSTR*)&uuidString); - - HANDLE hPipe = ::CreateNamedPipe( - pipeName.c_str(), - PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED, - PIPE_TYPE_MESSAGE | PIPE_READMODE_MESSAGE, - 1, - 4096, - 4096, - 100, - nullptr); - - if (hPipe == INVALID_HANDLE_VALUE) { - if (GetLastError() != ERROR_PIPE_BUSY) - { - printf("Could not open pipe %d\n", GetLastError()); - } + WSADATA wsaData; + int iResult; - // All pipe instances are busy, so wait for sometime. - else if (!WaitNamedPipe(pipeName.c_str(), NMPWAIT_USE_DEFAULT_WAIT)) - { - printf("Could not open pipe: wait timed out.\n"); - } + SOCKET ListenSocket = INVALID_SOCKET; + + struct addrinfo *result = NULL; + struct addrinfo hints; + + // Initialize Winsock + iResult = WSAStartup(MAKEWORD(2, 2), &wsaData); + if (iResult != 0) { + printf("WSAStartup failed with error: %d\n", iResult); } - HANDLE hPipe2 = CreateFile( - pipeName.c_str(), // pipe name - GENERIC_READ | // read and write access - GENERIC_WRITE, - 0, // no sharing - NULL, // default security attributes - OPEN_EXISTING, // opens existing pipe - 0, // default attributes - NULL); // no template file - - if (hPipe2 == INVALID_HANDLE_VALUE) { - if (GetLastError() != ERROR_PIPE_BUSY) - { - printf("Could not open pipe2 %d\n", GetLastError()); - } + ZeroMemory(&hints, sizeof(hints)); + hints.ai_family = AF_INET; + hints.ai_socktype = SOCK_STREAM; + hints.ai_protocol = IPPROTO_TCP; + hints.ai_flags = AI_PASSIVE; + + // Resolve the server address and port + iResult = getaddrinfo(NULL, "0", &hints, &result); + if (iResult != 0) { + printf("getaddrinfo failed with error: %d\n", iResult); + WSACleanup(); + } - // All pipe instances are busy, so wait for sometime. - else if (!WaitNamedPipe(pipeName.c_str(), NMPWAIT_USE_DEFAULT_WAIT)) - { - printf("Could not open pipe2: wait timed out.\n"); - } + // Create a SOCKET for connecting to server + ListenSocket = socket(result->ai_family, result->ai_socktype, result->ai_protocol); + if (ListenSocket == INVALID_SOCKET) { + printf("socket failed with error: %ld\n", WSAGetLastError()); + freeaddrinfo(result); + WSACleanup(); } - pipeFileDescriptors_[0] = (int)hPipe; - pipeFileDescriptors_[1] = (int)hPipe2; + // Setup the TCP listening socket + iResult = bind(ListenSocket, result->ai_addr, (int)result->ai_addrlen); + if (iResult == SOCKET_ERROR) { + printf("bind failed with error: %d\n", WSAGetLastError()); + freeaddrinfo(result); + closesocket(ListenSocket); + WSACleanup(); + } - wsaEvent_ = ::CreateEventW(nullptr, TRUE, FALSE, nullptr); + sockaddr* connected_addr = new sockaddr(); + USHORT port = 0; + int namelength = sizeof(sockaddr); + iResult = getsockname(ListenSocket, connected_addr, &namelength); + if (iResult == SOCKET_ERROR) { + printf("getsockname failed with error: %d\n", WSAGetLastError()); + } else if (connected_addr->sa_family == AF_INET) { + port = ((struct sockaddr_in*)connected_addr)->sin_port; + } + delete connected_addr; + + freeaddrinfo(result); - if (wsaEvent_ == WSA_INVALID_EVENT) { - printf("Invalid Event Created!\n"); + iResult = listen(ListenSocket, SOMAXCONN); + if (iResult == SOCKET_ERROR) { + printf("listen failed with error: %d\n", WSAGetLastError()); + closesocket(ListenSocket); + WSACleanup(); } - ov = { 0 }; - ov.hEvent = wsaEvent_; + wsaData; + pipeFileDescriptors_[0] = INVALID_SOCKET; + struct addrinfo *ptr = NULL; + + // Initialize Winsock + iResult = WSAStartup(MAKEWORD(2, 2), &wsaData); + if (iResult != 0) { + printf("WSAStartup failed with error: %d\n", iResult); + } + + ZeroMemory(&hints, sizeof(hints)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + hints.ai_protocol = IPPROTO_TCP; + + // Resolve the server address and port + iResult = getaddrinfo("127.0.0.1", std::to_string(ntohs(port)).c_str(), &hints, &result); + if (iResult != 0) { + printf("getaddrinfo failed with error: %d\n", iResult); + WSACleanup(); + } - BOOL retVal = ::ConnectNamedPipe(hPipe, &ov); + // Attempt to connect to an address until one succeeds + for (ptr = result; ptr != NULL; ptr = ptr->ai_next) { - if (retVal == 0) { - int error = GetLastError(); + // Create a SOCKET for connecting to server + pipeFileDescriptors_[0] = socket(ptr->ai_family, ptr->ai_socktype, + ptr->ai_protocol); + if (pipeFileDescriptors_[0] == INVALID_SOCKET) { + printf("socket failed with error: %ld\n", WSAGetLastError()); + WSACleanup(); + } - if (error != 535) { - printf("ERROR: ConnectNamedPipe failed with (%d)\n", error); + // Connect to server. + iResult = connect(pipeFileDescriptors_[0], ptr->ai_addr, (int)ptr->ai_addrlen); + if (iResult == SOCKET_ERROR) { + printf("connect failed with error: %ld\n", WSAGetLastError()); + closesocket(pipeFileDescriptors_[0]); + pipeFileDescriptors_[0] = INVALID_SOCKET; + continue; } + break; + } + + freeaddrinfo(result); + + if (pipeFileDescriptors_[0] == INVALID_SOCKET) { + printf("Unable to connect to server!\n"); + WSACleanup(); + } + + // Accept a client socket + pipeFileDescriptors_[1] = accept(ListenSocket, NULL, NULL); + if (pipeFileDescriptors_[1] == INVALID_SOCKET) { + printf("accept failed with error: %d\n", WSAGetLastError()); + closesocket(ListenSocket); + WSACleanup(); } #else if(pipe2(pipeFileDescriptors_, O_NONBLOCK) == -1) { @@ -270,23 +317,17 @@ DBusQueueWatch::DBusQueueWatch(std::shared_ptr<DBusConnection> _connection) : pi DBusQueueWatch::~DBusQueueWatch() { #ifdef WIN32 - BOOL retVal = DisconnectNamedPipe((HANDLE)pipeFileDescriptors_[0]); - - if (!retVal) { - printf(TEXT("DisconnectNamedPipe failed. GLE=%d\n"), GetLastError()); + // shutdown the connection since no more data will be sent + int iResult = shutdown(pipeFileDescriptors_[0], SD_SEND); + if (iResult == SOCKET_ERROR) { + printf("shutdown failed with error: %d\n", WSAGetLastError()); + closesocket(pipeFileDescriptors_[0]); + WSACleanup(); } - retVal = CloseHandle((HANDLE)pipeFileDescriptors_[0]); - - if (!retVal) { - printf(TEXT("CloseHandle failed. GLE=%d\n"), GetLastError()); - } - - retVal = CloseHandle((HANDLE)pipeFileDescriptors_[1]); - - if (!retVal) { - printf(TEXT("CloseHandle2 failed. GLE=%d\n"), GetLastError()); - } + // cleanup + closesocket(pipeFileDescriptors_[0]); + WSACleanup(); #else close(pipeFileDescriptors_[0]); close(pipeFileDescriptors_[1]); @@ -337,20 +378,16 @@ void DBusQueueWatch::pushQueue(std::shared_ptr<QueueEntry> _queueEntry) { queue_.push(_queueEntry); #ifdef WIN32 - char writeValue[sizeof(pipeValue_)]; - *reinterpret_cast<int*>(writeValue) = pipeValue_; - DWORD cbWritten; + // Send an initial buffer + char *sendbuf = "1"; - int fSuccess = WriteFile( - (HANDLE)pipeFileDescriptors_[1], // pipe handle - writeValue, // message - sizeof(pipeValue_), // message length - &cbWritten, // bytes written - &ov); // overlapped + int iResult = send(pipeFileDescriptors_[1], sendbuf, (int)strlen(sendbuf), 0); + if (iResult == SOCKET_ERROR) { + int error = WSAGetLastError(); - if (!fSuccess) - { - printf(TEXT("WriteFile to pipe failed. GLE=%d\n"), GetLastError()); + if (error != WSANOTINITIALISED) { + printf("send failed with error: %d\n", error); + } } #else if(write(pipeFileDescriptors_[1], &pipeValue_, sizeof(pipeValue_)) == -1) { @@ -363,19 +400,20 @@ void DBusQueueWatch::popQueue() { std::unique_lock<std::mutex> itsLock(queueMutex_); #ifdef WIN32 - char readValue[sizeof(pipeValue_)]; - DWORD cbRead; - - int fSuccess = ReadFile( - (HANDLE)pipeFileDescriptors_[0], // pipe handle - readValue, // buffer to receive reply - sizeof(pipeValue_), // size of buffer - &cbRead, // number of bytes read - &ov); // overlapped - - if (!fSuccess) - { - printf(TEXT("ReadFile to pipe failed. GLE=%d\n"), GetLastError()); + // Receive until the peer closes the connection + int iResult; + char recvbuf[1]; + int recvbuflen = 1; + + iResult = recv(pipeFileDescriptors_[0], recvbuf, recvbuflen, 0); + if (iResult > 0) { + //printf("Bytes received from %d: %d\n", wakeFd_.fd, iResult); + } + else if (iResult == 0) { + printf("Connection closed\n"); + } + else { + printf("recv failed with error: %d\n", WSAGetLastError()); } #else int readValue = 0; diff --git a/src/CommonAPI/DBus/DBusMessage.cpp b/src/CommonAPI/DBus/DBusMessage.cpp index 09199dc..927624c 100644 --- a/src/CommonAPI/DBus/DBusMessage.cpp +++ b/src/CommonAPI/DBus/DBusMessage.cpp @@ -115,7 +115,7 @@ DBusMessage::createMethodReturn(const std::string &_signature) const { DBusMessage DBusMessage::createMethodError( - const std::string &_code, const std::string &_info) const { + const std::string &_code, const std::string &_signature, const std::string &_info) const { ::DBusMessage *methodError = dbus_message_new_error(message_, _code.c_str(), _info.c_str()); @@ -123,6 +123,8 @@ DBusMessage::createMethodError( COMMONAPI_ERROR(std::string(__FUNCTION__), " dbus_message_new_error() returned NULL"); } + dbus_message_set_signature(methodError, _signature.c_str()); + return DBusMessage(methodError, false); } diff --git a/src/CommonAPI/DBus/DBusProxy.cpp b/src/CommonAPI/DBus/DBusProxy.cpp index 655fa6c..2d19670 100644 --- a/src/CommonAPI/DBus/DBusProxy.cpp +++ b/src/CommonAPI/DBus/DBusProxy.cpp @@ -25,14 +25,8 @@ void DBusProxyStatusEvent::onListenerAdded(const Listener &_listener, //notify listener about availability status -> push function to mainloop std::weak_ptr<DBusProxy> itsdbusProxy = dbusProxy_->shared_from_this(); - std::function<void(std::weak_ptr<DBusProxy>, Listener, Subscription)> notifySpecificListenerHandler = - std::bind(&DBusProxy::notifySpecificListener, - dbusProxy_, - std::placeholders::_1, - std::placeholders::_2, - std::placeholders::_3); dbusProxy_->getDBusConnection()->proxyPushFunctionToMainLoop<DBusConnection>( - notifySpecificListenerHandler, + DBusProxy::notifySpecificListener, itsdbusProxy, _listener, _subscription); @@ -201,10 +195,11 @@ DBusProxy::DBusProxy(const DBusAddress &_dbusAddress, } void DBusProxy::init() { - selfReference_ = shared_from_this(); + std::weak_ptr<DBusProxy> itsProxy = shared_from_this(); dbusServiceRegistrySubscription_ = dbusServiceRegistry_->subscribeAvailabilityListener( getAddress().getAddress(), - std::bind(&DBusProxy::onDBusServiceInstanceStatus, this, std::placeholders::_1)); + std::bind(&DBusProxy::onDBusServiceInstanceStatus, this, std::placeholders::_1, std::placeholders::_2), + itsProxy); } DBusProxy::~DBusProxy() { @@ -300,42 +295,44 @@ InterfaceVersionAttribute& DBusProxy::getInterfaceVersionAttribute() { void DBusProxy::signalMemberCallback(const CallStatus _status, const DBusMessage& dbusMessage, - DBusProxyConnection::DBusSignalHandler *_handler, + std::weak_ptr<DBusProxyConnection::DBusSignalHandler> _handler, const uint32_t _tag) { (void)_status; (void)_tag; - _handler->onSignalDBusMessage(dbusMessage); + if(auto itsHandler = _handler.lock()) + itsHandler->onSignalDBusMessage(dbusMessage); } void DBusProxy::signalInitialValueCallback(const CallStatus _status, const DBusMessage &_message, - DBusProxyConnection::DBusSignalHandler *_handler, + std::weak_ptr<DBusProxyConnection::DBusSignalHandler> _handler, const uint32_t _tag) { if (_status != CallStatus::SUCCESS) { COMMONAPI_ERROR("Error when receiving initial value of an attribute"); } else { - _handler->onInitialValueSignalDBusMessage(_message, _tag); + if(auto itsHandler = _handler.lock()) + itsHandler->onInitialValueSignalDBusMessage(_message, _tag); } } void DBusProxy::notifySpecificListener(std::weak_ptr<DBusProxy> _dbusProxy, const ProxyStatusEvent::Listener &_listener, const ProxyStatusEvent::Subscription _subscription) { - if(_dbusProxy.lock()) { - std::lock_guard<std::recursive_mutex> listenersLock(dbusProxyStatusEvent_.listenersMutex_); + if(auto itsDbusProxy = _dbusProxy.lock()) { + std::lock_guard<std::recursive_mutex> listenersLock(itsDbusProxy->dbusProxyStatusEvent_.listenersMutex_); - AvailabilityStatus itsStatus = availabilityStatus_; + AvailabilityStatus itsStatus = itsDbusProxy->availabilityStatus_; if (itsStatus != AvailabilityStatus::UNKNOWN) - dbusProxyStatusEvent_.notifySpecificListener(_subscription, itsStatus); + itsDbusProxy->dbusProxyStatusEvent_.notifySpecificListener(_subscription, itsStatus); //add listener to list so that it can be notified about a change of availability - dbusProxyStatusEvent_.listeners_.push_back(std::make_pair(_subscription, _listener)); + itsDbusProxy->dbusProxyStatusEvent_.listeners_.push_back(std::make_pair(_subscription, _listener)); } } -void DBusProxy::onDBusServiceInstanceStatus(const AvailabilityStatus& availabilityStatus) { - //ensure, proxy survives until notification is done - auto itsSelf = selfReference_.lock(); +void DBusProxy::onDBusServiceInstanceStatus(std::shared_ptr<DBusProxy> _proxy, + const AvailabilityStatus& availabilityStatus) { + (void)_proxy; if (availabilityStatus != availabilityStatus_) { availabilityMutex_.lock(); availabilityStatus_ = availabilityStatus; @@ -402,13 +399,14 @@ void DBusProxy::onDBusServiceInstanceStatus(const AvailabilityStatus& availabili signalMemberHandlerIterator != signalMemberHandlerQueue_.end(); signalMemberHandlerIterator++) { - if (std::get<7>(*signalMemberHandlerIterator)) { + auto itsHandler = std::get<5>(*signalMemberHandlerIterator).lock(); + if (itsHandler && std::get<7>(*signalMemberHandlerIterator)) { DBusProxyConnection::DBusSignalHandlerToken signalHandlerToken ( std::get<0>(*signalMemberHandlerIterator), std::get<1>(*signalMemberHandlerIterator), std::get<2>(*signalMemberHandlerIterator), std::get<3>(*signalMemberHandlerIterator)); - connection_->removeSignalMemberHandler(signalHandlerToken, std::get<5>(*signalMemberHandlerIterator)); + connection_->removeSignalMemberHandler(signalHandlerToken, itsHandler.get()); std::get<7>(*signalMemberHandlerIterator) = false; } } @@ -420,7 +418,8 @@ void DBusProxy::onDBusServiceInstanceStatus(const AvailabilityStatus& availabili } void DBusProxy::insertSelectiveSubscription(const std::string& interfaceMemberName, - DBusProxyConnection::DBusSignalHandler* dbusSignalHandler, uint32_t tag, + std::weak_ptr<DBusProxyConnection::DBusSignalHandler> dbusSignalHandler, + uint32_t tag, std::string interfaceMemberSignature) { std::lock_guard < std::mutex > queueLock(selectiveBroadcastHandlersMutex_); selectiveBroadcastHandlers[interfaceMemberName] = std::make_tuple( @@ -432,7 +431,7 @@ void DBusProxy::subscribeForSelectiveBroadcastOnConnection( const std::string& interfaceName, const std::string& interfaceMemberName, const std::string& interfaceMemberSignature, - DBusProxyConnection::DBusSignalHandler* dbusSignalHandler, + std::weak_ptr<DBusProxyConnection::DBusSignalHandler> dbusSignalHandler, uint32_t tag) { getDBusConnection()->subscribeForSelectiveBroadcast( @@ -464,7 +463,7 @@ DBusProxyConnection::DBusSignalHandlerToken DBusProxy::addSignalMemberHandler( const std::string& interfaceName, const std::string& signalName, const std::string& signalSignature, - DBusProxyConnection::DBusSignalHandler* dbusSignalHandler, + std::weak_ptr<DBusProxyConnection::DBusSignalHandler> dbusSignalHandler, const bool justAddFilter) { return DBusProxyBase::addSignalMemberHandler( objectPath, @@ -481,7 +480,7 @@ DBusProxyConnection::DBusSignalHandlerToken DBusProxy::addSignalMemberHandler( const std::string &signalName, const std::string &signalSignature, const std::string &getMethodName, - DBusProxyConnection::DBusSignalHandler *dbusSignalHandler, + std::weak_ptr<DBusProxyConnection::DBusSignalHandler> dbusSignalHandler, const bool justAddFilter) { DBusProxyConnection::DBusSignalHandlerToken signalHandlerToken ( @@ -555,7 +554,7 @@ void DBusProxy::addSignalMemberHandlerToQueue(SignalMemberHandlerTuple& _signalM bool DBusProxy::removeSignalMemberHandler( const DBusProxyConnection::DBusSignalHandlerToken &_dbusSignalHandlerToken, - const DBusProxyConnection::DBusSignalHandler *_dbusSignalHandler) { + const DBusProxyConnection::DBusSignalHandler* _dbusSignalHandler) { { std::lock_guard < std::mutex > queueLock(signalMemberHandlerQueueMutex_); @@ -581,7 +580,7 @@ bool DBusProxy::removeSignalMemberHandler( void DBusProxy::getCurrentValueForSignalListener( const std::string &getMethodName, - DBusProxyConnection::DBusSignalHandler *dbusSignalHandler, + std::weak_ptr<DBusProxyConnection::DBusSignalHandler> dbusSignalHandler, const uint32_t subscription) { availabilityMutex_.lock(); @@ -607,7 +606,7 @@ void DBusProxy::getCurrentValueForSignalListener( } void DBusProxy::freeDesktopGetCurrentValueForSignalListener( - DBusProxyConnection::DBusSignalHandler *dbusSignalHandler, + std::weak_ptr<DBusProxyConnection::DBusSignalHandler> dbusSignalHandler, const uint32_t subscription, const std::string &interfaceName, const std::string &propertyName) { diff --git a/src/CommonAPI/DBus/DBusProxyBase.cpp b/src/CommonAPI/DBus/DBusProxyBase.cpp index 0b1ebe4..3052f9e 100644 --- a/src/CommonAPI/DBus/DBusProxyBase.cpp +++ b/src/CommonAPI/DBus/DBusProxyBase.cpp @@ -40,7 +40,7 @@ DBusProxyConnection::DBusSignalHandlerToken DBusProxyBase::addSignalMemberHandle const std::string& interfaceName, const std::string& signalName, const std::string& signalSignature, - DBusProxyConnection::DBusSignalHandler* dbusSignalHandler, + std::weak_ptr<DBusProxyConnection::DBusSignalHandler> dbusSignalHandler, const bool justAddFilter) { return connection_->addSignalMemberHandler( objectPath, @@ -57,7 +57,7 @@ DBusProxyConnection::DBusSignalHandlerToken DBusProxyBase::addSignalMemberHandle const std::string &signalName, const std::string &signalSignature, const std::string &getMethodName, - DBusProxyConnection::DBusSignalHandler *dbusSignalHandler, + std::weak_ptr<DBusProxyConnection::DBusSignalHandler> dbusSignalHandler, const bool justAddFilter) { (void)getMethodName; return addSignalMemberHandler( @@ -69,7 +69,8 @@ DBusProxyConnection::DBusSignalHandlerToken DBusProxyBase::addSignalMemberHandle justAddFilter); } -bool DBusProxyBase::removeSignalMemberHandler(const DBusProxyConnection::DBusSignalHandlerToken& _dbusSignalHandlerToken, const DBusProxyConnection::DBusSignalHandler* _dbusSignalHandler) { +bool DBusProxyBase::removeSignalMemberHandler(const DBusProxyConnection::DBusSignalHandlerToken& _dbusSignalHandlerToken, + const DBusProxyConnection::DBusSignalHandler* _dbusSignalHandler) { return connection_->removeSignalMemberHandler(_dbusSignalHandlerToken, _dbusSignalHandler); } diff --git a/src/CommonAPI/DBus/DBusServiceRegistry.cpp b/src/CommonAPI/DBus/DBusServiceRegistry.cpp index d345a9d..dc7daa8 100644 --- a/src/CommonAPI/DBus/DBusServiceRegistry.cpp +++ b/src/CommonAPI/DBus/DBusServiceRegistry.cpp @@ -17,22 +17,22 @@ namespace CommonAPI { namespace DBus { -DBusServiceRegistry::RegistryMap_t DBusServiceRegistry::registries_; std::mutex DBusServiceRegistry::registriesMutex_; static CommonAPI::CallInfo serviceRegistryInfo(10000); std::shared_ptr<DBusServiceRegistry> DBusServiceRegistry::get(std::shared_ptr<DBusProxyConnection> _connection) { std::lock_guard<std::mutex> itsGuard(registriesMutex_); - auto registryIterator = registries_.find(_connection); - if (registryIterator != registries_.end()) + auto registries = getRegistryMap(); + auto registryIterator = registries->find(_connection); + if (registryIterator != registries->end()) return registryIterator->second; std::shared_ptr<DBusServiceRegistry> registry = std::make_shared<DBusServiceRegistry>(_connection); if (registry) { registry->init(); - registries_.insert( { _connection, registry } ); + registries->insert( { _connection, registry } ); } return registry; } @@ -40,13 +40,15 @@ DBusServiceRegistry::get(std::shared_ptr<DBusProxyConnection> _connection) { void DBusServiceRegistry::remove(std::shared_ptr<DBusProxyConnection> _connection) { std::lock_guard<std::mutex> itsGuard(registriesMutex_); - registries_.erase(_connection); + auto registries = getRegistryMap(); + registries->erase(_connection); } DBusServiceRegistry::DBusServiceRegistry(std::shared_ptr<DBusProxyConnection> dbusProxyConnection) : dbusDaemonProxy_(std::make_shared<CommonAPI::DBus::DBusDaemonProxy>(dbusProxyConnection)), initialized_(false), - notificationThread_() { + notificationThread_(), + registries_(getRegistryMap()) { } DBusServiceRegistry::~DBusServiceRegistry() { @@ -61,7 +63,9 @@ DBusServiceRegistry::~DBusServiceRegistry() { auto& dbusServiceListenersRecord = dbusServiceListenersIterator.second; if (dbusServiceListenersRecord.uniqueBusNameState == DBusRecordState::RESOLVED) { + dbusServicesMutex_.lock(); onDBusServiceNotAvailable(dbusServiceListenersRecord); + dbusServicesMutex_.unlock(); } } @@ -80,6 +84,7 @@ DBusServiceRegistry::~DBusServiceRegistry() { } void DBusServiceRegistry::init() { + selfReference_ = shared_from_this(); translator_ = DBusAddressTranslator::get(); dbusDaemonProxyNameOwnerChangedEventSubscription_ = @@ -97,7 +102,9 @@ void DBusServiceRegistry::init() { DBusServiceRegistry::DBusServiceSubscription DBusServiceRegistry::subscribeAvailabilityListener( - const std::string &_address, DBusServiceListener serviceListener) { + const std::string &_address, + DBusServiceListener serviceListener, + std::weak_ptr<DBusProxy> _proxy) { DBusAddress dbusAddress; translator_->translate(_address, dbusAddress); @@ -161,11 +168,13 @@ DBusServiceRegistry::subscribeAvailabilityListener( // LB TODO: check this as it looks STRANGE!!! if (availabilityStatus != AvailabilityStatus::UNKNOWN) { notificationThread_ = std::this_thread::get_id(); - serviceListener(availabilityStatus); + if(auto itsProxy = _proxy.lock()) + serviceListener(itsProxy, availabilityStatus); notificationThread_ = std::thread::id(); } dbusInterfaceNameListenersRecord.listenerList.push_front(std::move(serviceListener)); + dbusInterfaceNameListenersRecord.proxy = _proxy; dbusInterfaceNameListenersRecord.listenersToRemove.remove( dbusInterfaceNameListenersRecord.listenerList.begin()); @@ -510,7 +519,7 @@ void DBusServiceRegistry::resolveDBusServiceName(const std::string& dbusServiceN auto func = std::bind( &DBusServiceRegistry::onGetNameOwnerCallback, - this->shared_from_this(), + shared_from_this(), std::placeholders::_1, std::placeholders::_2, dbusServiceName); @@ -611,7 +620,7 @@ DBusServiceRegistry::getDBusObjectPathCacheReference( auto dbusProxyConnection = dbusDaemonProxy_->getDBusConnection(); const bool isSubscriptionSuccessful = dbusProxyConnection->addObjectManagerSignalMemberHandler( dbusServiceUniqueName, - this); + selfReference_); if (!isSubscriptionSuccessful) { COMMONAPI_ERROR(std::string(__FUNCTION__), " cannot subscribe too ", dbusServiceUniqueName); } @@ -682,7 +691,7 @@ bool DBusServiceRegistry::resolveObjectPathWithObjectManager(const std::string& // get managed objects from root object manager auto getManagedObjectsCallback = std::bind( &DBusServiceRegistry::onGetManagedObjectsCallbackResolve, - this->shared_from_this(), + shared_from_this(), std::placeholders::_1, std::placeholders::_2, dbusServiceUniqueName, @@ -788,16 +797,37 @@ void DBusServiceRegistry::onGetManagedObjectsCallbackResolve(const CallStatus& c } if(!objectPathFound) { - // object path is managed. Try to resolve object path with the help of the manager - auto getManagedObjectsCallback = std::bind( - &DBusServiceRegistry::onGetManagedObjectsCallbackResolve, - this->shared_from_this(), - std::placeholders::_1, - std::placeholders::_2, - dbusServiceUniqueName, - dbusObjectPath); + // check if the main part of the object path is in the list. + // if it is, the object path could be managed. + // else, it maybe existed a while back but is now gone, in which case just ignore. + std::string objectPathManager = dbusObjectPath.substr(0, dbusObjectPath.find_last_of("\\/")); - getManagedObjectsAsync(dbusServiceUniqueName, objectPathManager, getManagedObjectsCallback); + for(auto objectPathDict : availableServiceInstances) + { + + std::string objectPath = objectPathDict.first; + + if (dbusObjectPath.substr(0, objectPath.size()) != objectPath) + continue; + + // also check that the next character in dbusObject path is a slash or a backslash, + // so that we can make sure that we have compared against a full path element + auto delimiter = dbusObjectPath.at(objectPath.size()); + if (delimiter != '\\' && delimiter != '/') + continue; + + // object path is managed. Try to resolve object path with the help of the manager + auto getManagedObjectsCallback = std::bind( + &DBusServiceRegistry::onGetManagedObjectsCallbackResolve, + shared_from_this(), + std::placeholders::_1, + std::placeholders::_2, + dbusServiceUniqueName, + dbusObjectPath); + getManagedObjectsAsync(dbusServiceUniqueName, objectPathManager, getManagedObjectsCallback); + + } + } } else { COMMONAPI_ERROR("There is no Object Manager that manages " + dbusObjectPath + ". Resolving failed!"); @@ -1070,7 +1100,8 @@ void DBusServiceRegistry::notifyDBusInterfaceNameListeners(DBusInterfaceNameList dbusInterfaceNameListenersRecord.listenersToRemove.remove(dbusServiceListenerIterator); dbusServiceListenerIterator = dbusInterfaceNameListenersRecord.listenerList.erase(dbusServiceListenerIterator); } else { - (*dbusServiceListenerIterator)(availabilityStatus); + if(auto itsProxy = dbusInterfaceNameListenersRecord.proxy.lock()) + (*dbusServiceListenerIterator)(itsProxy, availabilityStatus); ++dbusServiceListenerIterator; } } |