diff options
author | Jürgen Gehring <Juergen.Gehring@bmw.de> | 2016-10-11 03:33:43 -0700 |
---|---|---|
committer | Jürgen Gehring <Juergen.Gehring@bmw.de> | 2016-10-11 03:33:43 -0700 |
commit | 8518d74e952ff0dbbb430e944fc3bee1b879f881 (patch) | |
tree | b9eca882899e33663ccf8c127050fcc4746d5957 | |
parent | bdf7ab8cf3243619f0b8bc526f07e5b03624b94c (diff) | |
download | genivi-common-api-dbus-runtime-8518d74e952ff0dbbb430e944fc3bee1b879f881.tar.gz |
CommonAPI-D-Bus 3.1.93.1.9
30 files changed, 1523 insertions, 1240 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index 4cb6030..f65ecb0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -10,7 +10,7 @@ PROJECT(libcommonapi-dbus) # version of CommonAPI-DBus SET( LIBCOMMONAPI_DBUS_MAJOR_VERSION 3 ) SET( LIBCOMMONAPI_DBUS_MINOR_VERSION 1 ) -SET( LIBCOMMONAPI_DBUS_PATCH_VERSION 8 ) +SET( LIBCOMMONAPI_DBUS_PATCH_VERSION 9 ) message(STATUS "Project name: ${PROJECT_NAME}") @@ -123,9 +123,9 @@ message(STATUS "CMAKE_FIND_ROOT_PATH: ${CMAKE_FIND_ROOT_PATH}") FIND_PACKAGE(PkgConfig) FIND_PACKAGE(Threads REQUIRED) if ("${USE_INSTALLED_COMMONAPI}" STREQUAL "ON") - FIND_PACKAGE(CommonAPI 3.1.8 REQUIRED CONFIG NO_CMAKE_PACKAGE_REGISTRY) + FIND_PACKAGE(CommonAPI 3.1.9 REQUIRED CONFIG NO_CMAKE_PACKAGE_REGISTRY) else() - FIND_PACKAGE(CommonAPI 3.1.8 REQUIRED CONFIG NO_SYSTEM_ENVIRONMENT_PATH NO_CMAKE_SYSTEM_PATH) + FIND_PACKAGE(CommonAPI 3.1.9 REQUIRED CONFIG NO_SYSTEM_ENVIRONMENT_PATH NO_CMAKE_SYSTEM_PATH) endif() message(STATUS "CommonAPI_CONSIDERED_CONFIGS: ${CommonAPI_CONSIDERED_CONFIGS}") @@ -161,8 +161,8 @@ endif() if (MSVC) # Visual C++ is not always sure whether he is really C++ -set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -D_CRT_SECURE_NO_WARNINGS -DCOMMONAPI_INTERNAL_COMPILATION -DCOMMONAPI_DLL_COMPILATION /EHsc /wd\\\"4503\\\"") -set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -D_CRT_SECURE_NO_WARNINGS -DCOMMONAPI_INTERNAL_COMPILATION -DCOMMONAPI_DLL_COMPILATION /wd\\\"4503\\\"") +set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -D_CRT_SECURE_NO_WARNINGS -DCOMMONAPI_INTERNAL_COMPILATION -DCOMMONAPI_DLL_COMPILATION /EHsc /wd4503") +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -D_CRT_SECURE_NO_WARNINGS -DCOMMONAPI_INTERNAL_COMPILATION -DCOMMONAPI_DLL_COMPILATION /wd4503") else() set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++0x -Wall -Wextra -Wformat -Wformat-security -Wconversion -fexceptions -fstrict-aliasing -fstack-protector -fasynchronous-unwind-tables -fno-omit-frame-pointer -DCOMMONAPI_INTERNAL_COMPILATION -D_GLIBCXX_USE_NANOSLEEP") endif() @@ -186,12 +186,15 @@ endif() # DBus source files file(GLOB CAPIDB_SRCS "src/CommonAPI/DBus/*.cpp") +list(SORT CAPIDB_SRCS) # pugixml source files file(GLOB PUGIXML_SRCS "src/pugixml/*.cpp") +list(SORT PUGIXML_SRCS) # murmurhash source files file(GLOB MMHASH_SRCS "src/murmurhash/*.cpp") +list(SORT MMHASH_SRCS) # CommonAPI-DBus library add_library(CommonAPI-DBus SHARED ${CAPIDB_SRCS} ${PUGIXML_SRCS} ${MMHASH_SRCS}) @@ -215,6 +218,7 @@ CONFIGURE_FILE(commonapi-dbus.spec.in commonapi-dbus.spec) # for installation of CommonAPI header files file (GLOB_RECURSE CommonAPI-DBus_INCLUDE_INSTALL_FILES "include/CommonAPI/DBus/*.hpp") +list (SORT CommonAPI-DBus_INCLUDE_INSTALL_FILES) set_target_properties (CommonAPI-DBus PROPERTIES PUBLIC_HEADER "${CommonAPI-DBus_INCLUDE_INSTALL_FILES}") # install CommonAPI-DBus library including headers @@ -1,3 +1,3 @@ -This is CommonAPI-DBus 3.1.8 +This is CommonAPI-DBus 3.1.9 Please refer to INSTALL for further information.
\ No newline at end of file diff --git a/include/CommonAPI/DBus/DBusConnection.hpp b/include/CommonAPI/DBus/DBusConnection.hpp index f79dc27..b32274f 100644 --- a/include/CommonAPI/DBus/DBusConnection.hpp +++ b/include/CommonAPI/DBus/DBusConnection.hpp @@ -26,6 +26,54 @@ namespace CommonAPI { namespace DBus { +class DBusConnection; + +struct QueueEntry { + QueueEntry() { } + virtual ~QueueEntry() { } + + virtual void process(std::shared_ptr<DBusConnection> _connection) = 0; + virtual void clear() = 0; +}; + +struct MsgQueueEntry : QueueEntry { + MsgQueueEntry(DBusMessage _message) : + message_(_message) { } + virtual ~MsgQueueEntry() { } + DBusMessage message_; + + virtual void process(std::shared_ptr<DBusConnection> _connection); + virtual void clear(); + }; + +struct MsgReplyQueueEntry : MsgQueueEntry { + MsgReplyQueueEntry(DBusProxyConnection::DBusMessageReplyAsyncHandler* _replyAsyncHandler, + DBusMessage _reply) : + MsgQueueEntry(_reply), + replyAsyncHandler_(_replyAsyncHandler) { } + virtual ~MsgReplyQueueEntry() { } + + DBusProxyConnection::DBusMessageReplyAsyncHandler* replyAsyncHandler_; + + void process(std::shared_ptr<DBusConnection> _connection); + void clear(); +}; + +template<class Function, class... Arguments> +struct FunctionQueueEntry : QueueEntry { + + using bindType = decltype(std::bind(std::declval<Function>(),std::declval<Arguments>()...)); + + FunctionQueueEntry(Function&& _function, + Arguments&& ... _args): + bind_(std::forward<Function>(_function), std::forward<Arguments>(_args)...) { } + + bindType bind_; + + void process(std::shared_ptr<DBusConnection> _connection); + void clear(); +}; + class DBusMainLoop; class DBusObjectManager; @@ -55,6 +103,16 @@ struct WatchContext { std::weak_ptr<DBusConnection> dbusConnection_; }; +struct TimeoutContext { + TimeoutContext(std::weak_ptr<MainLoopContext> mainLoopContext, + std::weak_ptr<DBusConnection> dbusConnection) : + mainLoopContext_(mainLoopContext), dbusConnection_(dbusConnection) { + } + + std::weak_ptr<MainLoopContext> mainLoopContext_; + std::weak_ptr<DBusConnection> dbusConnection_; +}; + class DBusConnection : public DBusProxyConnection, public std::enable_shared_from_this<DBusConnection> { @@ -88,7 +146,7 @@ public: COMMONAPI_EXPORT bool sendDBusMessage(const DBusMessage& dbusMessage/*, uint32_t* allocatedSerial = NULL*/) const; - COMMONAPI_EXPORT std::future<CallStatus> sendDBusMessageWithReplyAsync( + COMMONAPI_EXPORT bool sendDBusMessageWithReplyAsync( const DBusMessage& dbusMessage, std::unique_ptr<DBusMessageReplyAsyncHandler> dbusMessageReplyAsyncHandler, const CommonAPI::CallInfo *_info) const; @@ -150,9 +208,17 @@ public: COMMONAPI_EXPORT bool setDispatching(bool isDispatching); - COMMONAPI_EXPORT void pushDBusMessageReply(const DBusMessage& _reply, + template<class Function, class... Arguments> + COMMONAPI_EXPORT void processFunctionQueueEntry(FunctionQueueEntry<Function, Arguments ...> &_functionQueueEntry); + + COMMONAPI_EXPORT void pushDBusMessageReplyToMainLoop(const DBusMessage& _reply, std::unique_ptr<DBusMessageReplyAsyncHandler> _dbusMessageReplyAsyncHandler); + template<class Function, class... Arguments> + COMMONAPI_EXPORT void proxyPushFunctionToMainLoop(Function&& _function, Arguments&& ... _args); + + COMMONAPI_EXPORT void setPendingCallTimedOut(DBusPendingCall* _pendingCall, ::DBusTimeout* _timeout) const; + #ifdef COMMONAPI_DBUS_TEST inline std::weak_ptr<DBusMainloop> getLoop() { return loop_; } #endif @@ -179,10 +245,11 @@ public: std::thread* dispatchThread_; std::weak_ptr<MainLoopContext> mainLoopContext_; - DBusMessageWatch* msgWatch_; - DBusMessageDispatchSource* msgDispatchSource_; + DBusQueueWatch* queueWatch_; + DBusQueueDispatchSource* queueDispatchSource_; DispatchSource* dispatchSource_; WatchContext* watchContext_; + TimeoutContext* timeoutContext_; COMMONAPI_EXPORT void addLibdbusSignalMatchRule(const std::string& objectPath, const std::string& interfaceName, @@ -227,8 +294,8 @@ public: COMMONAPI_EXPORT void enforceAsynchronousTimeouts() const; COMMONAPI_EXPORT static const DBusObjectPathVTable* getDBusObjectPathVTable(); - COMMONAPI_EXPORT void sendPendingSelectiveSubscription(DBusProxy* proxy, std::string methodName, - DBusSignalHandler* dbusSignalHandler, uint32_t tag); + COMMONAPI_EXPORT void sendPendingSelectiveSubscription(DBusProxy* proxy, std::string interfaceMemberName, + DBusSignalHandler* dbusSignalHandler, uint32_t tag, std::string interfaceMemberSignature); ::DBusConnection* connection_; mutable std::mutex connectionGuard_; @@ -270,7 +337,7 @@ public: typedef std::pair< DBusPendingCall*, std::tuple< - std::chrono::time_point<std::chrono::high_resolution_clock>, + std::chrono::steady_clock::time_point, DBusMessageReplyAsyncHandler*, DBusMessage > @@ -278,7 +345,7 @@ public: mutable std::map< DBusPendingCall*, std::tuple< - std::chrono::time_point<std::chrono::high_resolution_clock>, + std::chrono::steady_clock::time_point, DBusMessageReplyAsyncHandler*, DBusMessage > @@ -320,6 +387,30 @@ public: }; +template<class Function, class... Arguments> +void FunctionQueueEntry<Function, Arguments ...>::process(std::shared_ptr<DBusConnection> _connection) { + _connection->processFunctionQueueEntry(*this); +} + +template<class Function, class... Arguments> +void FunctionQueueEntry<Function, Arguments ...>::clear() { +} + +template<class Function, class... Arguments> +void DBusConnection::processFunctionQueueEntry(FunctionQueueEntry<Function, Arguments ...> &_functionQueueEntry) { + _functionQueueEntry.bind_(); +} + +template<class Function, class... Arguments> +void DBusConnection::proxyPushFunctionToMainLoop(Function&& _function, Arguments&& ... _args) { + if (auto lockedContext = mainLoopContext_.lock()) { + std::shared_ptr<FunctionQueueEntry<Function, Arguments ...>> functionQueueEntry = std::make_shared<FunctionQueueEntry<Function, Arguments ...>>( + std::forward<Function>(_function), std::forward<Arguments>(_args) ...); + queueWatch_->pushQueue(functionQueueEntry); + } +} + + } // namespace DBus } // namespace CommonAPI diff --git a/include/CommonAPI/DBus/DBusDaemonProxy.hpp b/include/CommonAPI/DBus/DBusDaemonProxy.hpp index 0652654..44b882b 100644 --- a/include/CommonAPI/DBus/DBusDaemonProxy.hpp +++ b/include/CommonAPI/DBus/DBusDaemonProxy.hpp @@ -79,10 +79,25 @@ class DBusDaemonProxy : public DBusProxyBase, COMMONAPI_EXPORT std::future<CallStatus> listNamesAsync(typename DBusProxyAsyncCallbackHandler<DelegateObjectType, std::vector<std::string>>::Delegate& delegate) const { DBusMessage dbusMessage = createMethodCall("ListNames", ""); - return getDBusConnection()->sendDBusMessageWithReplyAsync( + + auto dbusMessageReplyAsyncHandler = std::move(DBusProxyAsyncCallbackHandler< + DelegateObjectType, std::vector< std::string > >::create(delegate, std::tuple<std::vector<std::string>>())); + + std::future<CallStatus> callStatusFuture; + try { + callStatusFuture = dbusMessageReplyAsyncHandler->getFuture(); + } catch (std::exception& e) { + COMMONAPI_ERROR("getNameOwnerAsync: messageReplyAsyncHandler future failed(", e.what(), ")"); + } + + if(getDBusConnection()->sendDBusMessageWithReplyAsync( dbusMessage, - DBusProxyAsyncCallbackHandler<DelegateObjectType, std::vector<std::string>>::create(delegate, std::tuple<std::vector<std::string>>()), - &daemonProxyInfo); + std::move(dbusMessageReplyAsyncHandler), + &daemonProxyInfo)) { + return callStatusFuture; + } else { + return std::future<CallStatus>(); + } } COMMONAPI_EXPORT void nameHasOwner(const std::string& busName, CommonAPI::CallStatus& callStatus, bool& hasOwner) const; @@ -102,10 +117,24 @@ class DBusDaemonProxy : public DBusProxyBase, } outputStream.flush(); - return getDBusConnection()->sendDBusMessageWithReplyAsync( + auto dbusMessageReplyAsyncHandler = std::move(DBusProxyAsyncCallbackHandler< + DelegateObjectType, bool >::create(delegate, std::tuple< bool >())); + + std::future<CallStatus> callStatusFuture; + try { + callStatusFuture = dbusMessageReplyAsyncHandler->getFuture(); + } catch (std::exception& e) { + COMMONAPI_ERROR("getNameOwnerAsync: messageReplyAsyncHandler future failed(", e.what(), ")"); + } + + if (getDBusConnection()->sendDBusMessageWithReplyAsync( dbusMessage, - DBusProxyAsyncCallbackHandler<DelegateObjectType, bool>::create(delegate, std::tuple<bool>()), - &daemonProxyInfo); + std::move(dbusMessageReplyAsyncHandler), + &daemonProxyInfo)) { + return callStatusFuture; + } else { + return std::future<CallStatus>(); + } } template <typename DelegateObjectType> @@ -146,10 +175,24 @@ class DBusDaemonProxy : public DBusProxyBase, } outputStream.flush(); - return getDBusConnection()->sendDBusMessageWithReplyAsync( + auto dbusMessageReplyAsyncHandler = std::move(DBusProxyAsyncCallbackHandler< + DelegateObjectType, std::string>::create(delegate, std::tuple<std::string>())); + + std::future<CallStatus> callStatusFuture; + try { + callStatusFuture = dbusMessageReplyAsyncHandler->getFuture(); + } catch (std::exception& e) { + COMMONAPI_ERROR("getNameOwnerAsync: messageReplyAsyncHandler future failed(", e.what(), ")"); + } + + if (getDBusConnection()->sendDBusMessageWithReplyAsync( dbusMessage, - DBusProxyAsyncCallbackHandler<DelegateObjectType, std::string>::create(delegate, std::tuple<std::string>()), - &daemonProxyInfo); + std::move(dbusMessageReplyAsyncHandler), + &daemonProxyInfo)) { + return callStatusFuture; + } else { + return std::future<CallStatus>(); + } } private: diff --git a/include/CommonAPI/DBus/DBusFactory.hpp b/include/CommonAPI/DBus/DBusFactory.hpp index 6ce0c7e..51ec476 100644 --- a/include/CommonAPI/DBus/DBusFactory.hpp +++ b/include/CommonAPI/DBus/DBusFactory.hpp @@ -89,14 +89,14 @@ public: COMMONAPI_EXPORT bool registerManagedService(const std::shared_ptr<DBusStubAdapter> &_adapter); COMMONAPI_EXPORT bool unregisterManagedService(const std::string &_address); - COMMONAPI_EXPORT void incrementConnection(std::shared_ptr<DBusProxyConnection>); COMMONAPI_EXPORT void decrementConnection(std::shared_ptr<DBusProxyConnection>); - COMMONAPI_EXPORT void releaseConnection(const ConnectionId_t&, MainLoopContext*); + COMMONAPI_EXPORT void releaseConnection(const ConnectionId_t&); // Initialization COMMONAPI_EXPORT void registerInterface(InterfaceInitFunction _function); private: + COMMONAPI_EXPORT void incrementConnection(std::shared_ptr<DBusProxyConnection>); COMMONAPI_EXPORT std::shared_ptr<DBusConnection> getConnection(const ConnectionId_t &); COMMONAPI_EXPORT std::shared_ptr<DBusConnection> getConnection(std::shared_ptr<MainLoopContext>); COMMONAPI_EXPORT bool registerStubAdapter(std::shared_ptr<DBusStubAdapter>); @@ -109,12 +109,9 @@ private: private: static std::shared_ptr<Factory> theFactory; - std::mutex connectionsMutex_; + std::recursive_mutex connectionsMutex_; std::map<ConnectionId_t, std::shared_ptr<DBusConnection>> connections_; - std::mutex contextConnectionsMutex_; - std::map<MainLoopContext *, std::shared_ptr<DBusConnection>> contextConnections_; - std::map<std::string, ProxyCreateFunction> proxyCreateFunctions_; std::map<std::string, StubAdapterCreateFunction> stubAdapterCreateFunctions_; diff --git a/include/CommonAPI/DBus/DBusFreedesktopStubAdapterHelper.hpp b/include/CommonAPI/DBus/DBusFreedesktopStubAdapterHelper.hpp index ec55f4e..d5abcca 100644 --- a/include/CommonAPI/DBus/DBusFreedesktopStubAdapterHelper.hpp +++ b/include/CommonAPI/DBus/DBusFreedesktopStubAdapterHelper.hpp @@ -41,7 +41,6 @@ class DBusGetFreedesktopAttributeStubDispatcher : public virtual DBusGetAttributeStubDispatcher<StubClass_, AttributeType_, AttributeDepl_>, public virtual DBusGetFreedesktopAttributeStubDispatcherBase<StubClass_> { public: - typedef DBusStubAdapterHelper<StubClass_> DBusStubAdapterHelperType; typedef typename DBusGetAttributeStubDispatcher<StubClass_, AttributeType_, AttributeDepl_>::GetStubFunctor GetStubFunctor; DBusGetFreedesktopAttributeStubDispatcher(GetStubFunctor _getStubFunctor, AttributeDepl_ *_depl = nullptr) @@ -63,7 +62,7 @@ public: } protected: - virtual bool sendAttributeValueReply(const DBusMessage &_message, const std::shared_ptr<StubClass_> &_stub, DBusStubAdapterHelperType &_helper) { + virtual bool sendAttributeValueReply(const DBusMessage &_message, const std::shared_ptr<StubClass_> &_stub, std::weak_ptr<DBusProxyConnection> connection_) { DBusMessage reply = _message.createMethodReturn(DBusGetAttributeStubDispatcher<StubClass_, AttributeType_, AttributeDepl_>::signature_); VariantDeployment<AttributeDepl_> actualDepl(true, DBusGetAttributeStubDispatcher<StubClass_, AttributeType_, AttributeDepl_>::depl_); @@ -74,8 +73,11 @@ protected: DBusOutputStream output(reply); output << deployedVariant; output.flush(); - - return _helper.getDBusConnection()->sendDBusMessage(reply); + if (std::shared_ptr<DBusProxyConnection> connection = connection_.lock()) { + return connection->sendDBusMessage(reply); + } else { + return false; + } } }; @@ -85,8 +87,7 @@ class DBusSetFreedesktopAttributeStubDispatcher public virtual DBusSetAttributeStubDispatcher<StubClass_, AttributeType_, AttributeDepl_> { public: typedef typename DBusGetAttributeStubDispatcher<StubClass_, AttributeType_, AttributeDepl_>::GetStubFunctor GetStubFunctor; - typedef typename DBusGetAttributeStubDispatcher<StubClass_, AttributeType_, AttributeDepl_>::DBusStubAdapterHelperType DBusStubAdapterHelperType; - typedef typename DBusStubAdapterHelperType::RemoteEventHandlerType RemoteEventHandlerType; + typedef typename StubClass_::RemoteEventHandlerType RemoteEventHandlerType; typedef bool (RemoteEventHandlerType::*OnRemoteSetFunctor)(std::shared_ptr<CommonAPI::ClientId>, AttributeType_); typedef void (RemoteEventHandlerType::*OnRemoteChangedFunctor)(); @@ -122,11 +123,10 @@ class DBusSetFreedesktopObservableAttributeStubDispatcher : public virtual DBusSetFreedesktopAttributeStubDispatcher<StubClass_, AttributeType_, AttributeDepl_>, public virtual DBusSetObservableAttributeStubDispatcher<StubClass_, AttributeType_, AttributeDepl_> { public: - typedef typename DBusSetFreedesktopAttributeStubDispatcher<StubClass_, AttributeType_, AttributeDepl_>::DBusStubAdapterHelperType DBusStubAdapterHelperType; - typedef typename DBusStubAdapterHelperType::StubAdapterType StubAdapterType; typedef typename DBusSetFreedesktopAttributeStubDispatcher<StubClass_, AttributeType_, AttributeDepl_>::GetStubFunctor GetStubFunctor; typedef typename DBusSetFreedesktopAttributeStubDispatcher<StubClass_, AttributeType_, AttributeDepl_>::OnRemoteSetFunctor OnRemoteSetFunctor; typedef typename DBusSetFreedesktopAttributeStubDispatcher<StubClass_, AttributeType_, AttributeDepl_>::OnRemoteChangedFunctor OnRemoteChangedFunctor; + typedef typename StubClass_::StubAdapterType StubAdapterType; typedef void (StubAdapterType::*FireChangedFunctor)(const AttributeType_&); DBusSetFreedesktopObservableAttributeStubDispatcher( @@ -177,7 +177,7 @@ struct DBusStubFreedesktopPropertiesSignalHelper { "PropertiesChanged", "sa{sv}as", _stub.getDBusConnection(), - _stub.getInterface(), + _stub.getDBusAddress().getInterface(), deployedChangedProperties, invalidatedProperties); } diff --git a/include/CommonAPI/DBus/DBusInputStream.hpp b/include/CommonAPI/DBus/DBusInputStream.hpp index ec6c7d0..61e4133 100644 --- a/include/CommonAPI/DBus/DBusInputStream.hpp +++ b/include/CommonAPI/DBus/DBusInputStream.hpp @@ -454,7 +454,7 @@ public: char * raw = _readRaw(sizeof(double)); if (!hasError()) { - _value = (float) (*(reinterpret_cast<double*>(raw))); + _value = float(*(reinterpret_cast<double*>(raw))); } return (*this); } diff --git a/include/CommonAPI/DBus/DBusInstanceAvailabilityStatusChangedEvent.hpp b/include/CommonAPI/DBus/DBusInstanceAvailabilityStatusChangedEvent.hpp index ec2d70f..1f5bcc2 100644 --- a/include/CommonAPI/DBus/DBusInstanceAvailabilityStatusChangedEvent.hpp +++ b/include/CommonAPI/DBus/DBusInstanceAvailabilityStatusChangedEvent.hpp @@ -15,11 +15,10 @@ #include <string> #include <vector> +#include <CommonAPI/Export.hpp> #include <CommonAPI/ProxyManager.hpp> -#include <CommonAPI/DBus/DBusAddressTranslator.hpp> #include <CommonAPI/DBus/DBusProxy.hpp> #include <CommonAPI/DBus/DBusObjectManagerStub.hpp> -#include <CommonAPI/DBus/DBusInstanceAvailabilityStatusChangedEvent.hpp> #include <CommonAPI/DBus/DBusTypes.hpp> namespace CommonAPI { @@ -30,115 +29,61 @@ class DBusInstanceAvailabilityStatusChangedEvent: public ProxyManager::InstanceAvailabilityStatusChangedEvent, public DBusProxyConnection::DBusSignalHandler { public: - DBusInstanceAvailabilityStatusChangedEvent(DBusProxy &_proxy, const std::string &_interfaceName) : - proxy_(_proxy), - observedInterfaceName_(_interfaceName) { - } - - virtual ~DBusInstanceAvailabilityStatusChangedEvent() { - proxy_.removeSignalMemberHandler(interfacesAddedSubscription_, this); - proxy_.removeSignalMemberHandler(interfacesRemovedSubscription_, this); - } - - virtual void onSignalDBusMessage(const DBusMessage& dbusMessage) { - if (dbusMessage.hasMemberName("InterfacesAdded")) { - onInterfacesAddedSignal(dbusMessage); - } else if (dbusMessage.hasMemberName("InterfacesRemoved")) { - onInterfacesRemovedSignal(dbusMessage); - } - } + + typedef std::function<void(const CallStatus &, const std::vector<DBusAddress> &)> GetAvailableServiceInstancesCallback; + + COMMONAPI_EXPORT DBusInstanceAvailabilityStatusChangedEvent(DBusProxy &_proxy, + const std::string &_dbusInterfaceName, + const std::string &_capiInterfaceName); + + COMMONAPI_EXPORT virtual ~DBusInstanceAvailabilityStatusChangedEvent(); + + COMMONAPI_EXPORT virtual void onSignalDBusMessage(const DBusMessage& dbusMessage); + + COMMONAPI_EXPORT void getAvailableServiceInstances(CommonAPI::CallStatus &_status, std::vector<DBusAddress> &_availableServiceInstances); + COMMONAPI_EXPORT std::future<CallStatus> getAvailableServiceInstancesAsync(GetAvailableServiceInstancesCallback _callback); + + COMMONAPI_EXPORT void getServiceInstanceAvailabilityStatus(const std::string &_instance, + CallStatus &_callStatus, + AvailabilityStatus &_availabilityStatus); + COMMONAPI_EXPORT std::future<CallStatus> getServiceInstanceAvailabilityStatusAsync(const std::string& _instance, + ProxyManager::GetInstanceAvailabilityStatusCallback _callback); protected: - virtual void onFirstListenerAdded(const Listener&) { - interfacesAddedSubscription_ = proxy_.addSignalMemberHandler( - proxy_.getDBusAddress().getObjectPath(), - DBusObjectManagerStub::getInterfaceName(), - "InterfacesAdded", - "oa{sa{sv}}", - this, - false); - - interfacesRemovedSubscription_ = proxy_.addSignalMemberHandler( - proxy_.getDBusAddress().getObjectPath(), - DBusObjectManagerStub::getInterfaceName(), - "InterfacesRemoved", - "oas", - this, - false); - } - - virtual void onLastListenerRemoved(const Listener&) { - proxy_.removeSignalMemberHandler(interfacesAddedSubscription_, this); - proxy_.removeSignalMemberHandler(interfacesRemovedSubscription_, this); - } + virtual void onFirstListenerAdded(const Listener&); + virtual void onLastListenerRemoved(const Listener&); private: - inline void onInterfacesAddedSignal(const DBusMessage &_message) { - DBusInputStream dbusInputStream(_message); - std::string dbusObjectPath; - std::string dbusInterfaceName; - DBusInterfacesAndPropertiesDict dbusInterfacesAndPropertiesDict; - - dbusInputStream >> dbusObjectPath; - if (dbusInputStream.hasError()) { - COMMONAPI_ERROR(std::string(__FUNCTION__) + " failed to read object path"); - } - - dbusInputStream.beginReadMapOfSerializableStructs(); - while (!dbusInputStream.readMapCompleted()) { - dbusInputStream.align(8); - dbusInputStream >> dbusInterfaceName; - dbusInputStream.skipMap(); - if (dbusInputStream.hasError()) { - COMMONAPI_ERROR(std::string(__FUNCTION__) + " failed to read interface name"); - } - if(dbusInterfaceName == observedInterfaceName_) { - notifyInterfaceStatusChanged(dbusObjectPath, dbusInterfaceName, AvailabilityStatus::AVAILABLE); - } - } - dbusInputStream.endReadMapOfSerializableStructs(); - } - - inline void onInterfacesRemovedSignal(const DBusMessage &_message) { - DBusInputStream dbusInputStream(_message); - std::string dbusObjectPath; - std::vector<std::string> dbusInterfaceNames; - - dbusInputStream >> dbusObjectPath; - if (dbusInputStream.hasError()) { - COMMONAPI_ERROR(std::string(__FUNCTION__) + " failed to read object path"); - } - - dbusInputStream >> dbusInterfaceNames; - if (dbusInputStream.hasError()) { - COMMONAPI_ERROR(std::string(__FUNCTION__) + " failed to read interface names"); - } - - for (const auto& dbusInterfaceName : dbusInterfaceNames) { - if(dbusInterfaceName == observedInterfaceName_) { - notifyInterfaceStatusChanged(dbusObjectPath, dbusInterfaceName, AvailabilityStatus::NOT_AVAILABLE); - } - } - } + + void onInterfacesAddedSignal(const DBusMessage &_message); + + void onInterfacesRemovedSignal(const DBusMessage &_message); void notifyInterfaceStatusChanged(const std::string &_objectPath, const std::string &_interfaceName, - const AvailabilityStatus &_availability) { - CommonAPI::Address itsAddress; - DBusAddress itsDBusAddress(proxy_.getDBusAddress().getService(), - _objectPath, - _interfaceName); + const AvailabilityStatus &_availability); - DBusAddressTranslator::get()->translate(itsDBusAddress, itsAddress); + bool addInterface(const std::string &_dbusObjectPath, + const std::string &_dbusInterfaceName); + bool removeInterface(const std::string &_dbusObjectPath, + const std::string &_dbusInterfaceName); - notifyListeners(itsAddress.getAddress(), _availability); - } + void serviceInstancesAsyncCallback(std::shared_ptr<Proxy> _proxy, + const DBusObjectManagerStub::DBusObjectPathAndInterfacesDict _dict, + GetAvailableServiceInstancesCallback &_call, + std::shared_ptr<std::promise<CallStatus> > &_promise); + void translate(const DBusObjectManagerStub::DBusObjectPathAndInterfacesDict &_dict, + std::vector<DBusAddress> &_serviceInstances); DBusProxy &proxy_; - std::string observedInterfaceName_; + std::string observedDbusInterfaceName_; + std::string observedCapiInterfaceName_; DBusProxyConnection::DBusSignalHandlerToken interfacesAddedSubscription_; DBusProxyConnection::DBusSignalHandlerToken interfacesRemovedSubscription_; + std::mutex interfacesMutex_; + std::map<std::string, std::set<std::string>> interfaces_; + const std::shared_ptr<DBusServiceRegistry> registry_; }; } // namespace DBus diff --git a/include/CommonAPI/DBus/DBusMainLoopContext.hpp b/include/CommonAPI/DBus/DBusMainLoopContext.hpp index c048c90..6de84f8 100644 --- a/include/CommonAPI/DBus/DBusMainLoopContext.hpp +++ b/include/CommonAPI/DBus/DBusMainLoopContext.hpp @@ -38,18 +38,18 @@ class DBusDispatchSource: public DispatchSource { DBusConnection* dbusConnection_; }; -class DBusMessageWatch; -class DBusMessageDispatchSource: public DispatchSource { +class DBusQueueWatch; +class DBusQueueDispatchSource: public DispatchSource { public: - DBusMessageDispatchSource(DBusMessageWatch* watch); - virtual ~DBusMessageDispatchSource(); + DBusQueueDispatchSource(DBusQueueWatch* watch); + virtual ~DBusQueueDispatchSource(); bool prepare(int64_t& timeout); bool check(); bool dispatch(); private: - DBusMessageWatch* watch_; + DBusQueueWatch* watch_; std::mutex watchMutex_; }; @@ -88,32 +88,13 @@ class DBusWatch: public Watch { #endif }; -class DBusMessageWatch : public Watch { -public: - - struct MsgQueueEntry { - MsgQueueEntry(DBusMessage _message) : - message_(_message) { } - DBusMessage message_; - - virtual void process(std::shared_ptr<DBusConnection> _connection) = 0; - virtual void clear(); - }; +struct QueueEntry; - struct MsgReplyQueueEntry : MsgQueueEntry { - MsgReplyQueueEntry(DBusProxyConnection::DBusMessageReplyAsyncHandler* _replyAsyncHandler, - DBusMessage _reply) : - MsgQueueEntry(_reply), - replyAsyncHandler_(_replyAsyncHandler) { } - - DBusProxyConnection::DBusMessageReplyAsyncHandler* replyAsyncHandler_; - - void process(std::shared_ptr<DBusConnection> _connection); - void clear(); - }; +class DBusQueueWatch : public Watch { +public: - DBusMessageWatch(std::shared_ptr<DBusConnection> _connection); - virtual ~DBusMessageWatch(); + DBusQueueWatch(std::shared_ptr<DBusConnection> _connection); + virtual ~DBusQueueWatch(); void dispatch(unsigned int eventFlags); @@ -129,24 +110,24 @@ public: void removeDependentDispatchSource(CommonAPI::DispatchSource* _dispatchSource); - void pushMsgQueue(std::shared_ptr<MsgQueueEntry> _queueEntry); + void pushQueue(std::shared_ptr<QueueEntry> _queueEntry); - void popMsgQueue(); + void popQueue(); - std::shared_ptr<MsgQueueEntry> frontMsgQueue(); + std::shared_ptr<QueueEntry> frontQueue(); - bool emptyMsgQueue(); + bool emptyQueue(); - void processMsgQueueEntry(std::shared_ptr<MsgQueueEntry> _queueEntry); + void processQueueEntry(std::shared_ptr<QueueEntry> _queueEntry); private: int pipeFileDescriptors_[2]; pollfd pollFileDescriptor_; std::vector<CommonAPI::DispatchSource*> dependentDispatchSources_; - std::queue<std::shared_ptr<MsgQueueEntry>> msgQueue_; + std::queue<std::shared_ptr<QueueEntry>> queue_; - std::mutex msgQueueMutex_; + std::mutex queueMutex_; std::weak_ptr<DBusConnection> connection_; @@ -161,7 +142,9 @@ private: class DBusTimeout: public Timeout { public: - DBusTimeout(::DBusTimeout* libdbusTimeout, std::weak_ptr<MainLoopContext>& mainLoopContext); + DBusTimeout(::DBusTimeout* libdbusTimeout, + std::weak_ptr<MainLoopContext>& mainLoopContext, + std::weak_ptr<DBusConnection>& dbusConnection); bool isReadyToBeMonitored(); void startMonitoring(); @@ -171,12 +154,23 @@ class DBusTimeout: public Timeout { int64_t getTimeoutInterval() const; int64_t getReadyTime() const; + + void setPendingCall(DBusPendingCall* _pendingCall); + +#ifdef WIN32 + __declspec(thread) static DBusTimeout *currentTimeout_; +#else + thread_local static DBusTimeout *currentTimeout_; +#endif + private: void recalculateDueTime(); int64_t dueTimeInMs_; ::DBusTimeout* libdbusTimeout_; std::weak_ptr<MainLoopContext> mainLoopContext_; + std::weak_ptr<DBusConnection> dbusConnection_; + DBusPendingCall *pendingCall_; }; diff --git a/include/CommonAPI/DBus/DBusProxy.hpp b/include/CommonAPI/DBus/DBusProxy.hpp index c8a57bc..072e356 100644 --- a/include/CommonAPI/DBus/DBusProxy.hpp +++ b/include/CommonAPI/DBus/DBusProxy.hpp @@ -30,9 +30,13 @@ class DBusProxyStatusEvent virtual ~DBusProxyStatusEvent() {} protected: - virtual void onListenerAdded(const Listener& listener, const Subscription subscription); + virtual void onListenerAdded(const Listener& _listener, const Subscription _subscription); + virtual void onListenerRemoved(const Listener &_listener, const Subscription _subscription); DBusProxy* dbusProxy_; + + std::recursive_mutex listenersMutex_; + std::vector<std::pair<ProxyStatusEvent::Subscription, ProxyStatusEvent::Listener>> listeners_; }; @@ -44,6 +48,8 @@ public: const std::shared_ptr<DBusProxyConnection> &_connection); COMMONAPI_EXPORT virtual ~DBusProxy(); + COMMONAPI_EXPORT AvailabilityStatus getAvailabilityStatus() const; + COMMONAPI_EXPORT virtual ProxyStatusEvent& getProxyStatusEvent(); COMMONAPI_EXPORT virtual InterfaceVersionAttribute& getInterfaceVersionAttribute(); @@ -61,8 +67,10 @@ public: DBusProxyConnection::DBusSignalHandler* dbusSignalHandler, uint32_t tag); - COMMONAPI_EXPORT void insertSelectiveSubscription(const std::string& interfaceMemberName, - DBusProxyConnection::DBusSignalHandler* dbusSignalHandler, uint32_t tag); + COMMONAPI_EXPORT void insertSelectiveSubscription( + const std::string& interfaceMemberName, + DBusProxyConnection::DBusSignalHandler* dbusSignalHandler, + uint32_t tag, std::string interfaceMemberSignature); COMMONAPI_EXPORT void unsubscribeFromSelectiveBroadcast(const std::string& eventName, DBusProxyConnection::DBusSignalHandlerToken subscription, const DBusProxyConnection::DBusSignalHandler* dbusSignalHandler); @@ -101,6 +109,10 @@ public: const std::string &interfaceName, const std::string &propertyName); + COMMONAPI_EXPORT virtual void notifySpecificListener(std::weak_ptr<DBusProxy> _dbusProxy, + const ProxyStatusEvent::Listener &_listener, + const ProxyStatusEvent::Subscription _subscription); + private: typedef std::tuple< const std::string, @@ -142,7 +154,9 @@ private: std::list<SignalMemberHandlerTuple> signalMemberHandlerQueue_; mutable std::mutex signalMemberHandlerQueueMutex_; - std::map<std::string, std::pair<DBusProxyConnection::DBusSignalHandler*, uint32_t>> selectiveBroadcastHandlers; + std::map<std::string, + std::tuple<DBusProxyConnection::DBusSignalHandler*, uint32_t, + std::string>> selectiveBroadcastHandlers; mutable std::mutex selectiveBroadcastHandlersMutex_; mutable std::shared_ptr<std::thread> availabilityTimeoutThread_; @@ -151,11 +165,13 @@ private: mutable std::condition_variable availabilityTimeoutCondition_; typedef std::tuple< - std::chrono::time_point<std::chrono::high_resolution_clock>, + std::chrono::steady_clock::time_point, isAvailableAsyncCallback, std::promise<AvailabilityStatus> > AvailabilityTimeout_t; mutable std::list<AvailabilityTimeout_t> timeouts_; + + std::weak_ptr<DBusProxy> selfReference_; }; diff --git a/include/CommonAPI/DBus/DBusProxyAsyncCallbackHandler.hpp b/include/CommonAPI/DBus/DBusProxyAsyncCallbackHandler.hpp index c36ccb1..1a00adc 100644 --- a/include/CommonAPI/DBus/DBusProxyAsyncCallbackHandler.hpp +++ b/include/CommonAPI/DBus/DBusProxyAsyncCallbackHandler.hpp @@ -53,7 +53,10 @@ class DBusProxyAsyncCallbackHandler: timeoutOccurred_(false), hasToBeDeleted_(false) { } - virtual ~DBusProxyAsyncCallbackHandler() {} + virtual ~DBusProxyAsyncCallbackHandler() { + // free assigned std::function<> immediately + delegate_.function_ = [](CallStatus, ArgTypes_...) {}; + } virtual std::future<CallStatus> getFuture() { return promise_.get_future(); @@ -76,8 +79,6 @@ class DBusProxyAsyncCallbackHandler: virtual void setExecutionFinished() { executionFinished_ = true; - // free assigned std::function<> immediately - delegate_.function_ = [](CallStatus, ArgTypes_...) {}; } virtual bool getExecutionFinished() { diff --git a/include/CommonAPI/DBus/DBusProxyConnection.hpp b/include/CommonAPI/DBus/DBusProxyConnection.hpp index 8f7a1c8..6065a8b 100644 --- a/include/CommonAPI/DBus/DBusProxyConnection.hpp +++ b/include/CommonAPI/DBus/DBusProxyConnection.hpp @@ -83,7 +83,7 @@ class DBusProxyConnection { virtual bool sendDBusMessage(const DBusMessage& dbusMessage) const = 0; - virtual std::future<CallStatus> sendDBusMessageWithReplyAsync( + virtual bool sendDBusMessageWithReplyAsync( const DBusMessage& dbusMessage, std::unique_ptr<DBusMessageReplyAsyncHandler> dbusMessageReplyAsyncHandler, const CommonAPI::CallInfo *_info) const = 0; @@ -137,11 +137,18 @@ class DBusProxyConnection { virtual bool hasDispatchThread() = 0; - virtual void sendPendingSelectiveSubscription(DBusProxy* proxy, std::string methodName, - DBusSignalHandler* dbusSignalHandler, uint32_t tag) = 0; + virtual void sendPendingSelectiveSubscription( + DBusProxy* proxy, std::string interfaceMemberName, + DBusSignalHandler* dbusSignalHandler, uint32_t tag, + std::string interfaceMemberSignature) = 0; - virtual void pushDBusMessageReply(const DBusMessage& reply, + virtual void pushDBusMessageReplyToMainLoop(const DBusMessage& reply, std::unique_ptr<DBusMessageReplyAsyncHandler> dbusMessageReplyAsyncHandler) = 0; + + template<class DBusConnection, class Function, class... Arguments> + void proxyPushFunctionToMainLoop(Function&& _function, Arguments&& ... _args) { + static_cast<DBusConnection*>(this)->proxyPushFunctionToMainLoop(std::forward<Function>(_function), std::forward<Arguments>(_args) ...); + } }; } // namespace DBus diff --git a/include/CommonAPI/DBus/DBusProxyHelper.hpp b/include/CommonAPI/DBus/DBusProxyHelper.hpp index cb5b942..8bd0154 100644 --- a/include/CommonAPI/DBus/DBusProxyHelper.hpp +++ b/include/CommonAPI/DBus/DBusProxyHelper.hpp @@ -196,19 +196,33 @@ struct DBusProxyHelper<In_<DBusInputStream, DBusOutputStream, InArgs_...>, typename DBusProxyAsyncCallbackHandler< DBusProxy, OutArgs_... >::Delegate delegate(_proxy.shared_from_this(), _function); - auto dbusMessageReplyAsyncHandler = DBusProxyAsyncCallbackHandler< + auto dbusMessageReplyAsyncHandler = std::move(DBusProxyAsyncCallbackHandler< DBusProxy, OutArgs_... - >::create(delegate, _out).release(); + >::create(delegate, _out)); + + std::future<CallStatus> callStatusFuture; + try { + callStatusFuture = dbusMessageReplyAsyncHandler->getFuture(); + } catch (std::exception& e) { + COMMONAPI_ERROR("MethodAsync(dbus): messageReplyAsyncHandler future failed(", e.what(), ")"); + } if(_proxy.isAvailable()) { - return _proxy.getDBusConnection()->sendDBusMessageWithReplyAsync( + if (_proxy.getDBusConnection()->sendDBusMessageWithReplyAsync( _message, - std::unique_ptr<DBusProxyConnection::DBusMessageReplyAsyncHandler>(dbusMessageReplyAsyncHandler), - _info); + std::move(dbusMessageReplyAsyncHandler), + _info)){ + COMMONAPI_VERBOSE("MethodAsync(dbus): Proxy available -> sendMessageWithReplyAsync"); + return callStatusFuture; + } else { + return std::future<CallStatus>(); + } } else { + std::shared_ptr< std::unique_ptr< DBusProxyConnection::DBusMessageReplyAsyncHandler > > sharedDbusMessageReplyAsyncHandler( + new std::unique_ptr< DBusProxyConnection::DBusMessageReplyAsyncHandler >(std::move(dbusMessageReplyAsyncHandler))); //async isAvailable call with timeout - _proxy.isAvailableAsync([&_proxy, _message, _info, - _out, dbusMessageReplyAsyncHandler, _function]( + COMMONAPI_VERBOSE("MethodAsync(dbus): Proxy not available -> register calback"); + _proxy.isAvailableAsync([&_proxy, _message, sharedDbusMessageReplyAsyncHandler]( const AvailabilityStatus _status, const Timeout_t remaining) { if(_status == AvailabilityStatus::AVAILABLE) { @@ -217,20 +231,30 @@ struct DBusProxyHelper<In_<DBusInputStream, DBusOutputStream, InArgs_...>, if(remaining < 100) newTimeout = 100; CallInfo newInfo(newTimeout); + if(*sharedDbusMessageReplyAsyncHandler) { _proxy.getDBusConnection()->sendDBusMessageWithReplyAsync( _message, - std::unique_ptr<DBusProxyConnection::DBusMessageReplyAsyncHandler>(dbusMessageReplyAsyncHandler), + std::move(*sharedDbusMessageReplyAsyncHandler), &newInfo); + COMMONAPI_VERBOSE("MethodAsync(dbus): Proxy callback available -> sendMessageWithReplyAsync"); + } else { + COMMONAPI_ERROR("MethodAsync(dbus): Proxy callback available but callback taken"); + } } else { //create error message and push it directly to the connection unsigned int dummySerial = 999; _message.setSerial(dummySerial); //set dummy serial + if (*sharedDbusMessageReplyAsyncHandler) { DBusMessage errorMessage = _message.createMethodError(DBUS_ERROR_UNKNOWN_METHOD); - _proxy.getDBusConnection()->pushDBusMessageReply(errorMessage, - std::unique_ptr<DBusProxyConnection::DBusMessageReplyAsyncHandler>(dbusMessageReplyAsyncHandler)); + _proxy.getDBusConnection()->pushDBusMessageReplyToMainLoop(errorMessage, + std::move(*sharedDbusMessageReplyAsyncHandler)); + COMMONAPI_VERBOSE("MethodAsync(dbus): Proxy callback not reachable -> sendMessageWithReplyAsync"); + } else { + COMMONAPI_ERROR("MethodAsync(dbus): Proxy callback not reachable but callback taken"); + } } }, _info); - return dbusMessageReplyAsyncHandler->getFuture(); + return callStatusFuture; } } diff --git a/include/CommonAPI/DBus/DBusProxyManager.hpp b/include/CommonAPI/DBus/DBusProxyManager.hpp index ff28a26..1dd8dbe 100644 --- a/include/CommonAPI/DBus/DBusProxyManager.hpp +++ b/include/CommonAPI/DBus/DBusProxyManager.hpp @@ -29,41 +29,39 @@ namespace DBus { class COMMONAPI_EXPORT_CLASS_EXPLICIT DBusProxyManager: public ProxyManager { public: COMMONAPI_EXPORT DBusProxyManager(DBusProxy &_proxy, - const std::string &_interfaceName); + const std::string &_dbusInterfaceName, + const std::string &_capiInterfaceName); COMMONAPI_EXPORT const std::string &getDomain() const; COMMONAPI_EXPORT const std::string &getInterface() const; COMMONAPI_EXPORT const ConnectionId_t &getConnectionId() const; - COMMONAPI_EXPORT virtual void getAvailableInstances(CommonAPI::CallStatus &, std::vector<std::string> &_instances); + COMMONAPI_EXPORT virtual void getAvailableInstances(CommonAPI::CallStatus &_status, std::vector<std::string> &_instances); COMMONAPI_EXPORT virtual std::future<CallStatus> getAvailableInstancesAsync(GetAvailableInstancesCallback _callback); - COMMONAPI_EXPORT virtual void getInstanceAvailabilityStatus(const std::string &_address, + COMMONAPI_EXPORT virtual void getInstanceAvailabilityStatus(const std::string &_instance, CallStatus &_callStatus, AvailabilityStatus &_availabilityStatus); COMMONAPI_EXPORT virtual std::future<CallStatus> getInstanceAvailabilityStatusAsync( - const std::string&, - GetInstanceAvailabilityStatusCallback callback); + const std::string& _instance, + GetInstanceAvailabilityStatusCallback _callback); COMMONAPI_EXPORT virtual InstanceAvailabilityStatusChangedEvent& getInstanceAvailabilityStatusChangedEvent(); private: - COMMONAPI_EXPORT void instancesAsyncCallback(const CommonAPI::CallStatus& status, - const DBusObjectManagerStub::DBusObjectPathAndInterfacesDict& dict, - GetAvailableInstancesCallback& call); + COMMONAPI_EXPORT void instancesAsyncCallback(std::shared_ptr<Proxy> _proxy, + const CommonAPI::CallStatus &_status, + const std::vector<DBusAddress> &_availableServiceInstances, + GetAvailableInstancesCallback &_call); - COMMONAPI_EXPORT void instanceAliveAsyncCallback(const AvailabilityStatus &_alive, - GetInstanceAvailabilityStatusCallback &_call, - std::shared_ptr<std::promise<CallStatus>> &_status); - - COMMONAPI_EXPORT void translateCommonApiAddresses(const DBusObjectManagerStub::DBusObjectPathAndInterfacesDict &_dict, + COMMONAPI_EXPORT void translate(const std::vector<DBusAddress> &_serviceInstances, std::vector<std::string> &_instances); DBusProxy &proxy_; DBusInstanceAvailabilityStatusChangedEvent instanceAvailabilityStatusEvent_; - const std::string interfaceId_; - const std::shared_ptr<DBusServiceRegistry> registry_; + const std::string dbusInterfaceId_; + const std::string capiInterfaceId_; ConnectionId_t connectionId_; }; diff --git a/include/CommonAPI/DBus/DBusSelectiveEvent.hpp b/include/CommonAPI/DBus/DBusSelectiveEvent.hpp index 627a17b..1c1ce85 100644 --- a/include/CommonAPI/DBus/DBusSelectiveEvent.hpp +++ b/include/CommonAPI/DBus/DBusSelectiveEvent.hpp @@ -42,7 +42,7 @@ public: virtual void setSubscriptionToken(const DBusProxyConnection::DBusSignalHandlerToken _subscriptionToken, uint32_t tag) { this->subscription_ = _subscriptionToken; - static_cast<DBusProxy&>(this->proxy_).insertSelectiveSubscription(this->name_, this, tag); + static_cast<DBusProxy&>(this->proxy_).insertSelectiveSubscription(this->name_, this, tag, this->signature_); } protected: diff --git a/include/CommonAPI/DBus/DBusServiceRegistry.hpp b/include/CommonAPI/DBus/DBusServiceRegistry.hpp index 17e7139..00dd355 100644 --- a/include/CommonAPI/DBus/DBusServiceRegistry.hpp +++ b/include/CommonAPI/DBus/DBusServiceRegistry.hpp @@ -67,6 +67,8 @@ class DBusServiceRegistry: public std::enable_shared_from_this<DBusServiceRegist typedef std::list<DBusManagedInterfaceListener> DBusManagedInterfaceListenerList; typedef DBusManagedInterfaceListenerList::iterator DBusManagedInterfaceSubscription; + typedef std::function<void(const DBusObjectManagerStub::DBusObjectPathAndInterfacesDict)> GetAvailableServiceInstancesCallback; + static std::shared_ptr<DBusServiceRegistry> get(std::shared_ptr<DBusProxyConnection> _connection); static void remove(std::shared_ptr<DBusProxyConnection> _connection); @@ -91,12 +93,13 @@ class DBusServiceRegistry: public std::enable_shared_from_this<DBusServiceRegist const std::string &_dbusObjectPath); - virtual std::vector<std::string> getAvailableServiceInstances(const std::string &_interface, - const std::string &_domain = "local"); + virtual void getAvailableServiceInstances(const std::string& dbusServiceName, + const std::string& dbusObjectPath, + DBusObjectManagerStub::DBusObjectPathAndInterfacesDict& availableServiceInstances); - virtual void getAvailableServiceInstancesAsync(CommonAPI::Factory::AvailableInstancesCbk_t _cbk, - const std::string &_interface, - const std::string &_domain = "local"); + virtual void getAvailableServiceInstancesAsync(GetAvailableServiceInstancesCallback callback, + const std::string& dbusServiceName, + const std::string& dbusObjectPath); virtual void onSignalDBusMessage(const DBusMessage&); @@ -110,11 +113,13 @@ class DBusServiceRegistry: public std::enable_shared_from_this<DBusServiceRegist DBusInterfaceNameListenersRecord(DBusInterfaceNameListenersRecord &&_other) : state(_other.state), - listenerList(std::move(_other.listenerList)) { + listenerList(std::move(_other.listenerList)), + listenersToRemove(std::move(_other.listenersToRemove)) { } DBusRecordState state; DBusServiceListenerList listenerList; + std::list<DBusServiceSubscription> listenersToRemove; }; typedef std::unordered_map<std::string, DBusInterfaceNameListenersRecord> DBusInterfaceNameListenersMap; @@ -200,7 +205,7 @@ class DBusServiceRegistry: public std::enable_shared_from_this<DBusServiceRegist std::unordered_map<std::string, DBusUniqueNameRecord*> dbusServiceNameMap_; // protects the dbus service maps - std::mutex dbusServicesMutex_; + std::recursive_mutex dbusServicesMutex_; void resolveDBusServiceName(const std::string& dbusServiceName, DBusServiceListenersRecord& dbusServiceListenersRecord); @@ -222,24 +227,23 @@ class DBusServiceRegistry: public std::enable_shared_from_this<DBusServiceRegist void releaseDBusObjectPathCacheReference(const std::string& dbusObjectPath, const DBusServiceListenersRecord& dbusServiceListenersRecord); + bool resolveObjectPathWithObjectManager(const std::string& dbusServiceUniqueName, const std::string& dbusObjectPath); + typedef std::function<void(const CallStatus&, const DBusObjectManagerStub::DBusObjectPathAndInterfacesDict, const std::string&, const std::string&)> GetManagedObjectsCallback; - bool resolveObjectPathWithObjectManager(const std::string& dbusServiceUniqueName, const std::string& dbusObjectPath); - - bool getManagedObjects(const std::string& dbusServiceUniqueName, + bool getManagedObjects(const std::string& dbusServiceName, const std::string& dbusObjectPath, - GetManagedObjectsCallback callback); + DBusObjectManagerStub::DBusObjectPathAndInterfacesDict& availableServiceInstances); - void onGetManagedObjectsCallbackResolve(const CallStatus& callStatus, - const DBusObjectManagerStub::DBusObjectPathAndInterfacesDict dbusObjectPathAndInterfacesDict, - const std::string& dbusServiceUniqueName, - const std::string& dbusObjectPath); + bool getManagedObjectsAsync(const std::string& dbusServiceName, + const std::string& dbusObjectPath, + GetManagedObjectsCallback callback); - void onGetManagedObjectsCallbackResolveFurther(const CallStatus& callStatus, - const DBusObjectManagerStub::DBusObjectPathAndInterfacesDict dbusObjectPathAndInterfacesDict, + void onGetManagedObjectsCallbackResolve(const CallStatus& callStatus, + const DBusObjectManagerStub::DBusObjectPathAndInterfacesDict availableServiceInstances, const std::string& dbusServiceUniqueName, const std::string& dbusObjectPath); @@ -247,33 +251,6 @@ class DBusServiceRegistry: public std::enable_shared_from_this<DBusServiceRegist const std::string& dbusServiceUniqueName, const std::string& interfaceName); - bool introspectDBusObjectPath(const std::string& dbusServiceUniqueName, const std::string& dbusObjectPath); - - void onIntrospectCallback(const CallStatus& status, - std::string xmlData, - const std::string& dbusServiceName, - const std::string& dbusObjectPath); - - void parseIntrospectionData(const std::string& xmlData, - const std::string& rootObjectPath, - const std::string& dbusServiceUniqueName); - - void parseIntrospectionNode(const pugi::xml_node& node, - const std::string& rootObjectPath, - const std::string& fullObjectPath, - const std::string& dbusServiceUniqueName); - - void processIntrospectionObjectPath(const pugi::xml_node& node, - const std::string& rootObjectPath, - const std::string& dbusServiceUniqueName); - - void processIntrospectionInterface(const pugi::xml_node& node, - const std::string& rootObjectPath, - const std::string& fullObjectPath, - const std::string& dbusServiceUniqueName); - - void onDBusDaemonProxyStatusEvent(const AvailabilityStatus& availabilityStatus); - void onDBusDaemonProxyNameOwnerChangedEvent(const std::string& name, const std::string& oldOwner, const std::string& newOwner); @@ -313,14 +290,6 @@ class DBusServiceRegistry: public std::enable_shared_from_this<DBusServiceRegist bool findCachedDbusService(const std::string& dbusServiceName, DBusUniqueNameRecord** uniqueNameRecord); bool findCachedObjectPath(const std::string& dbusObjectPathName, const DBusUniqueNameRecord* uniqueNameRecord, DBusObjectPathCache* objectPathCache); - std::condition_variable monitorResolveAllServices_; - std::mutex mutexServiceResolveCount; - int servicesToResolve; - - std::condition_variable monitorResolveAllObjectPaths_; - std::mutex mutexObjectPathsResolveCount; - int objectPathsToResolve; - void fetchAllServiceNames(); inline bool isDBusServiceName(const std::string &_name) { diff --git a/include/CommonAPI/DBus/DBusStubAdapterHelper.hpp b/include/CommonAPI/DBus/DBusStubAdapterHelper.hpp index 42d7301..59d33bc 100644 --- a/include/CommonAPI/DBus/DBusStubAdapterHelper.hpp +++ b/include/CommonAPI/DBus/DBusStubAdapterHelper.hpp @@ -27,59 +27,112 @@ namespace CommonAPI { namespace DBus { -class StubDispatcherBase { +template <typename StubClass_> +class StubDispatcher { public: - virtual ~StubDispatcherBase() { } -}; - + typedef typename StubClass_::RemoteEventHandlerType RemoteEventHandlerType; + virtual ~StubDispatcher() {} + virtual bool dispatchDBusMessage(const DBusMessage &_message, + const std::shared_ptr<StubClass_> &_stub, + RemoteEventHandlerType* _remoteEventHandler, + std::weak_ptr<DBusProxyConnection> _connection_) = 0; + virtual void appendGetAllReply(const DBusMessage &_message, + const std::shared_ptr<StubClass_> &_stub, + DBusOutputStream &_output) { + (void)_message; + (void)_stub; + (void)_output; + } +}; +template <typename StubClass_> struct DBusAttributeDispatcherStruct { - StubDispatcherBase* getter; - StubDispatcherBase* setter; + StubDispatcher<StubClass_>* getter; + StubDispatcher<StubClass_>* setter; - DBusAttributeDispatcherStruct(StubDispatcherBase* g, StubDispatcherBase* s) { + DBusAttributeDispatcherStruct(StubDispatcher<StubClass_>* g, StubDispatcher<StubClass_>* s) { getter = g; setter = s; } }; -typedef std::unordered_map<std::string, DBusAttributeDispatcherStruct> StubAttributeTable; +template <typename T> +struct identity { typedef T type; }; -template <typename StubClass_> -class DBusStubAdapterHelper: public virtual DBusStubAdapter { +// interfaceMemberName, interfaceMemberSignature +typedef std::pair<const char*, const char*> DBusInterfaceMemberPath; + +template <typename... Stubs_> +class DBusStubAdapterHelper { +public: + DBusStubAdapterHelper(const DBusAddress &_address, + const std::shared_ptr<DBusProxyConnection> &_connection, + const bool _isManaging, + const std::shared_ptr<StubBase> &_stub) { + (void)_address; + (void)_connection; + (void) _isManaging; + (void) _stub; + } +protected: + bool findDispatcherAndHandle(const DBusMessage& dbusMessage, DBusInterfaceMemberPath& dbusInterfaceMemberPath) { + (void) dbusMessage; + (void) dbusInterfaceMemberPath; + return false; + } + bool findAttributeGetDispatcherAndHandle(std::string interfaceName, std::string attributeName, const DBusMessage &_message) { + (void) interfaceName; + (void) attributeName; + (void) _message; + return false; + } + bool findAttributeSetDispatcherAndHandle(std::string interfaceName, std::string attributeName, const DBusMessage &_message) { + (void) interfaceName; + (void) attributeName; + (void) _message; + return false; + } + bool appendGetAllReply(const DBusMessage& dbusMessage, DBusOutputStream& dbusOutputStream) { + (void) dbusMessage; + (void) dbusOutputStream; + return true; + } +public: + template <typename Stub_> + void addStubDispatcher(DBusInterfaceMemberPath _dbusInterfaceMemberPath, + StubDispatcher<Stub_>* _stubDispatcher) { + (void) _dbusInterfaceMemberPath; + (void) _stubDispatcher; + } + template <typename RemoteEventHandlerType> + void setRemoteEventHandler(RemoteEventHandlerType * _remoteEventHandler) { + (void) _remoteEventHandler; + } + +}; + +template <typename StubClass_, typename... Stubs_> +class DBusStubAdapterHelper<StubClass_, Stubs_...>: + public virtual DBusStubAdapter, + public DBusStubAdapterHelper<Stubs_...> { public: typedef typename StubClass_::StubAdapterType StubAdapterType; typedef typename StubClass_::RemoteEventHandlerType RemoteEventHandlerType; - class StubDispatcher: public StubDispatcherBase { - public: - virtual ~StubDispatcher() {} - virtual bool dispatchDBusMessage(const DBusMessage &_message, - const std::shared_ptr<StubClass_> &_stub, - DBusStubAdapterHelper<StubClass_> &_helper) = 0; - virtual void appendGetAllReply(const DBusMessage &_message, - const std::shared_ptr<StubClass_> &_stub, - DBusStubAdapterHelper<StubClass_> &_helper, - DBusOutputStream &_output) { - (void)_message; - (void)_stub; - (void)_helper; - (void)_output; - } - }; - // interfaceMemberName, interfaceMemberSignature - typedef std::pair<const char*, const char*> DBusInterfaceMemberPath; - typedef std::unordered_map<DBusInterfaceMemberPath, StubDispatcherBase*> StubDispatcherTable; + typedef std::unordered_map<DBusInterfaceMemberPath, StubDispatcher<StubClass_>*> StubDispatcherTable; + typedef std::unordered_map<std::string, DBusAttributeDispatcherStruct<StubClass_>> StubAttributeTable; DBusStubAdapterHelper(const DBusAddress &_address, const std::shared_ptr<DBusProxyConnection> &_connection, - const std::shared_ptr<StubClass_> &_stub, - const bool _isManaging): + const bool _isManaging, + const std::shared_ptr<StubBase> &_stub) : + DBusStubAdapter(_address, _connection, _isManaging), - stub_(_stub), + DBusStubAdapterHelper<Stubs_...>(_address, _connection, _isManaging, _stub), remoteEventHandler_(nullptr) { + stub_ = std::dynamic_pointer_cast<StubClass_>(_stub); } virtual ~DBusStubAdapterHelper() { @@ -91,6 +144,12 @@ class DBusStubAdapterHelper: public virtual DBusStubAdapter { DBusStubAdapter::init(instance); std::shared_ptr<StubAdapterType> stubAdapter = std::dynamic_pointer_cast<StubAdapterType>(instance); remoteEventHandler_ = stub_->initStubAdapter(stubAdapter); + DBusStubAdapterHelper<Stubs_...>::setRemoteEventHandler(remoteEventHandler_); + } + + void setRemoteEventHandler(RemoteEventHandlerType* _remoteEventHandler) { + remoteEventHandler_ = _remoteEventHandler; + DBusStubAdapterHelper<Stubs_...>::setRemoteEventHandler(remoteEventHandler_); } virtual void deinit() { @@ -115,16 +174,21 @@ class DBusStubAdapterHelper: public virtual DBusStubAdapter { COMMONAPI_ERROR(std::string(__FUNCTION__), " signature empty"); } - DBusInterfaceMemberPath dbusInterfaceMemberPath(interfaceMemberName, interfaceMemberSignature); - auto findIterator = getStubDispatcherTable().find(dbusInterfaceMemberPath); - const bool foundInterfaceMemberHandler = (findIterator != getStubDispatcherTable().end()); + DBusInterfaceMemberPath dbusInterfaceMemberPath = {interfaceMemberName, interfaceMemberSignature}; + return findDispatcherAndHandle(dbusMessage, dbusInterfaceMemberPath); + } + + bool findDispatcherAndHandle(const DBusMessage& dbusMessage, DBusInterfaceMemberPath& dbusInterfaceMemberPath) { + auto findIterator = stubDispatcherTable_.find(dbusInterfaceMemberPath); + const bool foundInterfaceMemberHandler = (findIterator != stubDispatcherTable_.end()); bool dbusMessageHandled = false; if (foundInterfaceMemberHandler) { - StubDispatcher* stubDispatcher = static_cast<StubDispatcher*>(findIterator->second); - dbusMessageHandled = stubDispatcher->dispatchDBusMessage(dbusMessage, stub_, *this); + StubDispatcher<StubClass_>* stubDispatcher = findIterator->second; + dbusMessageHandled = stubDispatcher->dispatchDBusMessage(dbusMessage, stub_, getRemoteEventHandler(), getDBusConnection()); + return dbusMessageHandled; } - return dbusMessageHandled; + return DBusStubAdapterHelper<Stubs_...>::findDispatcherAndHandle(dbusMessage, dbusInterfaceMemberPath); } virtual bool onInterfaceDBusFreedesktopPropertiesMessage(const DBusMessage &_message) { @@ -141,13 +205,26 @@ class DBusStubAdapterHelper: public virtual DBusStubAdapter { return false; } - virtual const StubDispatcherTable& getStubDispatcherTable() = 0; - virtual const StubAttributeTable& getStubAttributeTable() = 0; + template <typename Stub_> + void addStubDispatcher(DBusInterfaceMemberPath _dbusInterfaceMemberPath, + StubDispatcher<Stub_>* _stubDispatcher) { + addStubDispatcher(_dbusInterfaceMemberPath, _stubDispatcher, identity<Stub_>()); + } + + template <typename Stub_> + void addAttributeDispatcher(std::string _key, + StubDispatcher<Stub_>* _stubDispatcherGetter, + StubDispatcher<Stub_>* _stubDispatcherSetter) { + addAttributeDispatcher(_key, _stubDispatcherGetter, _stubDispatcherSetter, identity<Stub_>()); + } std::shared_ptr<StubClass_> stub_; RemoteEventHandlerType* remoteEventHandler_; + StubDispatcherTable stubDispatcherTable_; + StubAttributeTable stubAttributeTable_; + +protected: - private: bool handleFreedesktopGet(const DBusMessage &_message, DBusInputStream &_input) { std::string interfaceName; std::string attributeName; @@ -157,18 +234,23 @@ class DBusStubAdapterHelper: public virtual DBusStubAdapter { if (_input.hasError()) { return false; } + return findAttributeGetDispatcherAndHandle(interfaceName, attributeName, _message); + } - auto attributeDispatcherIterator = getStubAttributeTable().find(attributeName); - if (attributeDispatcherIterator == getStubAttributeTable().end()) { - return false; + bool findAttributeGetDispatcherAndHandle(std::string interfaceName, std::string attributeName, const DBusMessage &_message) { + + auto attributeDispatcherIterator = stubAttributeTable_.find(attributeName); + if (attributeDispatcherIterator == stubAttributeTable_.end()) { + // not found, try parent + return DBusStubAdapterHelper<Stubs_...>::findAttributeGetDispatcherAndHandle(interfaceName, attributeName, _message); } - StubDispatcher* getterDispatcher = static_cast<StubDispatcher*>(attributeDispatcherIterator->second.getter); + StubDispatcher<StubClass_>* getterDispatcher = attributeDispatcherIterator->second.getter; if (NULL == getterDispatcher) { // all attributes have at least a getter COMMONAPI_ERROR(std::string(__FUNCTION__), "getterDispatcher == NULL"); return false; } else { - return (getterDispatcher->dispatchDBusMessage(_message, stub_, *this)); + return getterDispatcher->dispatchDBusMessage(_message, stub_, getRemoteEventHandler(), getDBusConnection()); } } @@ -182,54 +264,98 @@ class DBusStubAdapterHelper: public virtual DBusStubAdapter { return false; } - auto attributeDispatcherIterator = getStubAttributeTable().find(attributeName); - if(attributeDispatcherIterator == getStubAttributeTable().end()) { - return false; - } + return findAttributeSetDispatcherAndHandle(interfaceName, attributeName, dbusMessage); + } - StubDispatcher *setterDispatcher = static_cast<StubDispatcher*>(attributeDispatcherIterator->second.setter); - if (setterDispatcher == NULL) { // readonly attributes do not have a setter - return false; - } + bool findAttributeSetDispatcherAndHandle(std::string interfaceName, std::string attributeName, const DBusMessage& dbusMessage) { - return setterDispatcher->dispatchDBusMessage(dbusMessage, stub_, *this); - } + auto attributeDispatcherIterator = stubAttributeTable_.find(attributeName); + if(attributeDispatcherIterator == stubAttributeTable_.end()) { + // not found, try parent + return DBusStubAdapterHelper<Stubs_...>::findAttributeSetDispatcherAndHandle(interfaceName, attributeName, dbusMessage); - bool handleFreedesktopGetAll(const DBusMessage& dbusMessage, DBusInputStream& dbusInputStream) { - std::string interfaceName; - dbusInputStream >> interfaceName; + } - if(dbusInputStream.hasError()) { + StubDispatcher<StubClass_> *setterDispatcher = attributeDispatcherIterator->second.setter; + if (setterDispatcher == NULL) { // readonly attributes do not have a setter return false; } - DBusMessage dbusMessageReply = dbusMessage.createMethodReturn("a{sv}"); - DBusOutputStream dbusOutputStream(dbusMessageReply); - - dbusOutputStream.beginWriteMap(); + return setterDispatcher->dispatchDBusMessage(dbusMessage, stub_, getRemoteEventHandler(), getDBusConnection()); + } - std::shared_ptr<DBusClientId> clientId = std::make_shared<DBusClientId>(std::string(dbusMessage.getSender())); - for(auto attributeDispatcherIterator = getStubAttributeTable().begin(); attributeDispatcherIterator != getStubAttributeTable().end(); attributeDispatcherIterator++) { + bool appendGetAllReply(const DBusMessage& dbusMessage, DBusOutputStream& dbusOutputStream) + { + for(auto attributeDispatcherIterator = stubAttributeTable_.begin(); attributeDispatcherIterator != stubAttributeTable_.end(); attributeDispatcherIterator++) { //To prevent the destruction of the stub whilst still handling a message if (stub_) { - StubDispatcher* getterDispatcher = static_cast<StubDispatcher*>(attributeDispatcherIterator->second.getter); + StubDispatcher<StubClass_>* getterDispatcher = attributeDispatcherIterator->second.getter; if (NULL == getterDispatcher) { // all attributes have at least a getter COMMONAPI_ERROR(std::string(__FUNCTION__), "getterDispatcher == NULL"); - break; + return false; } else { dbusOutputStream.align(8); dbusOutputStream << attributeDispatcherIterator->first; - getterDispatcher->appendGetAllReply(dbusMessage, stub_, *this, dbusOutputStream); + getterDispatcher->appendGetAllReply(dbusMessage, stub_, dbusOutputStream); } } } + return DBusStubAdapterHelper<Stubs_...>::appendGetAllReply(dbusMessage, dbusOutputStream); + } - dbusOutputStream.endWriteMap(); - dbusOutputStream.flush(); + private: + + template <typename Stub_> + void addStubDispatcher(DBusInterfaceMemberPath _dbusInterfaceMemberPath, + StubDispatcher<Stub_>* _stubDispatcher, + identity<Stub_>) { + DBusStubAdapterHelper<Stubs_...>::addStubDispatcher(_dbusInterfaceMemberPath, _stubDispatcher); + + } + + void addStubDispatcher(DBusInterfaceMemberPath _dbusInterfaceMemberPath, + StubDispatcher<StubClass_>* _stubDispatcher, + identity<StubClass_>) { + stubDispatcherTable_.insert({_dbusInterfaceMemberPath, _stubDispatcher}); + + } + + template <typename Stub_> + void addAttributeDispatcher(std::string _key, + StubDispatcher<Stub_>* _stubDispatcherGetter, + StubDispatcher<Stub_>* _stubDispatcherSetter, + identity<Stub_>) { + DBusStubAdapterHelper<Stubs_...>::addAttributeDispatcher(_key, _stubDispatcherGetter, _stubDispatcherSetter); + + } + + void addAttributeDispatcher(std::string _key, + StubDispatcher<StubClass_>* _stubDispatcherGetter, + StubDispatcher<StubClass_>* _stubDispatcherSetter, + identity<StubClass_>) { + stubAttributeTable_.insert({_key, {_stubDispatcherGetter, _stubDispatcherSetter}}); + } + + bool handleFreedesktopGetAll(const DBusMessage& dbusMessage, DBusInputStream& dbusInputStream) { + std::string interfaceName; + dbusInputStream >> interfaceName; + + if(dbusInputStream.hasError()) { + return false; + } + + DBusMessage dbusMessageReply = dbusMessage.createMethodReturn("a{sv}"); + DBusOutputStream dbusOutputStream(dbusMessageReply); + + dbusOutputStream.beginWriteMap(); + appendGetAllReply(dbusMessage, dbusOutputStream); + dbusOutputStream.endWriteMap(); + dbusOutputStream.flush(); + + return getDBusConnection()->sendDBusMessage(dbusMessageReply); + } - return getDBusConnection()->sendDBusMessage(dbusMessageReply); - } }; template< class > @@ -313,9 +439,10 @@ template < template <class...> class In_, class... InArgs_, template <class...> class DeplIn_, class... DeplIn_Args> -class DBusMethodStubDispatcher<StubClass_, In_<InArgs_...>, DeplIn_<DeplIn_Args...> >: public DBusStubAdapterHelper<StubClass_>::StubDispatcher { +class DBusMethodStubDispatcher<StubClass_, In_<InArgs_...>, DeplIn_<DeplIn_Args...> >: public StubDispatcher<StubClass_> { public: - typedef DBusStubAdapterHelper<StubClass_> DBusStubAdapterHelperType; + + typedef typename StubClass_::RemoteEventHandlerType RemoteEventHandlerType; typedef void (StubClass_::*StubFunctor_)(std::shared_ptr<CommonAPI::ClientId>, InArgs_...); DBusMethodStubDispatcher(StubFunctor_ stubFunctor, std::tuple<DeplIn_Args*...> _in): @@ -323,8 +450,12 @@ class DBusMethodStubDispatcher<StubClass_, In_<InArgs_...>, DeplIn_<DeplIn_Args. initialize(typename make_sequence_range<sizeof...(DeplIn_Args), 0>::type(), _in); } - bool dispatchDBusMessage(const DBusMessage& dbusMessage, const std::shared_ptr<StubClass_>& stub, DBusStubAdapterHelperType& dbusStubAdapterHelper) { - return handleDBusMessage(dbusMessage, stub, dbusStubAdapterHelper, typename make_sequence_range<sizeof...(InArgs_), 0>::type()); + bool dispatchDBusMessage(const DBusMessage& dbusMessage, const std::shared_ptr<StubClass_>& stub, + RemoteEventHandlerType* _remoteEventHandler, + std::weak_ptr<DBusProxyConnection> _connection) { + (void) _remoteEventHandler; + (void) _connection; + return handleDBusMessage(dbusMessage, stub, typename make_sequence_range<sizeof...(InArgs_), 0>::type()); } private: @@ -336,9 +467,7 @@ class DBusMethodStubDispatcher<StubClass_, In_<InArgs_...>, DeplIn_<DeplIn_Args. template <int... InArgIndices_> inline bool handleDBusMessage(const DBusMessage& dbusMessage, const std::shared_ptr<StubClass_>& stub, - DBusStubAdapterHelperType& dbusStubAdapterHelper, index_sequence<InArgIndices_...>) { - (void)dbusStubAdapterHelper; if (sizeof...(InArgs_) > 0) { DBusInputStream dbusInputStream(dbusMessage); @@ -375,15 +504,15 @@ class DBusMethodWithReplyStubDispatcher< Out_<OutArgs_...>, DeplIn_<DeplIn_Args...>, DeplOut_<DeplOutArgs_...> >: - public DBusStubAdapterHelper<StubClass_>::StubDispatcher { + public StubDispatcher<StubClass_> { public: - typedef DBusStubAdapterHelper<StubClass_> DBusStubAdapterHelperType; + typedef typename StubClass_::RemoteEventHandlerType RemoteEventHandlerType; typedef std::function<void (OutArgs_...)> ReplyType_t; typedef void (StubClass_::*StubFunctor_)( std::shared_ptr<CommonAPI::ClientId>, InArgs_..., ReplyType_t); DBusMethodWithReplyStubDispatcher(StubFunctor_ stubFunctor, - const char* dbusReplySignature, + const char* dbusReplySignature, std::tuple<DeplIn_Args*...> _inDepArgs, std::tuple<DeplOutArgs_*...> _outDepArgs): stubFunctor_(stubFunctor), @@ -395,19 +524,20 @@ class DBusMethodWithReplyStubDispatcher< } - bool dispatchDBusMessage(const DBusMessage& dbusMessage, + bool dispatchDBusMessage(const DBusMessage& dbusMessage, const std::shared_ptr<StubClass_>& stub, - DBusStubAdapterHelperType& dbusStubAdapterHelper) { - connection_ = dbusStubAdapterHelper.getDBusConnection(); + RemoteEventHandlerType* _remoteEventHandler, + std::weak_ptr<DBusProxyConnection> _connection) { + (void) _remoteEventHandler; + connection_ = _connection; return handleDBusMessage( dbusMessage, stub, - dbusStubAdapterHelper, typename make_sequence_range<sizeof...(InArgs_), 0>::type(), typename make_sequence_range<sizeof...(OutArgs_), 0>::type()); } - bool sendReply(CommonAPI::CallId_t _call, + bool sendReply(CommonAPI::CallId_t _call, std::tuple<CommonAPI::Deployable<OutArgs_, DeplOutArgs_>...> args = std::make_tuple()) { return sendReplyInternal(_call, typename make_sequence_range<sizeof...(OutArgs_), 0>::type(), args); } @@ -419,14 +549,11 @@ private: in_ = std::make_tuple(std::get<DeplIn_ArgIndices>(_in)...); } - template <int... InArgIndices_, int... OutArgIndices_> inline bool handleDBusMessage(const DBusMessage& dbusMessage, const std::shared_ptr<StubClass_>& stub, - DBusStubAdapterHelperType& dbusStubAdapterHelper, index_sequence<InArgIndices_...>, index_sequence<OutArgIndices_...>) { - (void)dbusStubAdapterHelper; if (sizeof...(DeplIn_Args) > 0) { DBusInputStream dbusInputStream(dbusMessage); const bool success = DBusSerializableArguments<CommonAPI::Deployable<InArgs_, DeplIn_Args>...>::deserialize(dbusInputStream, std::get<InArgIndices_>(in_)...); @@ -508,23 +635,28 @@ template < template <class...> class In_, class... InArgs_, template <class...> class Out_, class... OutArgs_> class DBusMethodWithReplyAdapterDispatcher<StubClass_, StubAdapterClass_, In_<InArgs_...>, Out_<OutArgs_...> >: - public DBusStubAdapterHelper<StubClass_>::StubDispatcher { + public StubDispatcher<StubClass_> { public: - typedef DBusStubAdapterHelper<StubClass_> DBusStubAdapterHelperType; + typedef typename StubClass_::RemoteEventHandlerType RemoteEventHandlerType; typedef void (StubAdapterClass_::*StubFunctor_)(std::shared_ptr<CommonAPI::ClientId>, InArgs_..., OutArgs_&...); - typedef typename CommonAPI::Stub<typename DBusStubAdapterHelperType::StubAdapterType, typename StubClass_::RemoteEventType> StubType; + typedef typename CommonAPI::Stub<typename StubClass_::StubAdapterType, typename StubClass_::RemoteEventType> StubType; DBusMethodWithReplyAdapterDispatcher(StubFunctor_ stubFunctor, const char* dbusReplySignature): stubFunctor_(stubFunctor), dbusReplySignature_(dbusReplySignature) { } - bool dispatchDBusMessage(const DBusMessage& dbusMessage, const std::shared_ptr<StubClass_>& stub, DBusStubAdapterHelperType& dbusStubAdapterHelper) { + bool dispatchDBusMessage(const DBusMessage& dbusMessage, const std::shared_ptr<StubClass_>& stub, + RemoteEventHandlerType* _remoteEventHandler, + std::weak_ptr<DBusProxyConnection> _connection) { + + (void)_remoteEventHandler; + std::tuple<InArgs_..., OutArgs_...> argTuple; return handleDBusMessage( dbusMessage, stub, - dbusStubAdapterHelper, + _connection, typename make_sequence_range<sizeof...(InArgs_), 0>::type(), typename make_sequence_range<sizeof...(OutArgs_), sizeof...(InArgs_)>::type(),argTuple); } @@ -533,7 +665,7 @@ class DBusMethodWithReplyAdapterDispatcher<StubClass_, StubAdapterClass_, In_<In template <int... InArgIndices_, int... OutArgIndices_> inline bool handleDBusMessage(const DBusMessage& dbusMessage, const std::shared_ptr<StubClass_>& stub, - DBusStubAdapterHelperType& dbusStubAdapterHelper, + std::weak_ptr<DBusProxyConnection> _connection, index_sequence<InArgIndices_...>, index_sequence<OutArgIndices_...>, std::tuple<InArgs_..., OutArgs_...> argTuple) const { @@ -559,8 +691,13 @@ class DBusMethodWithReplyAdapterDispatcher<StubClass_, StubAdapterClass_, In_<In dbusOutputStream.flush(); } - - return dbusStubAdapterHelper.getDBusConnection()->sendDBusMessage(dbusMessageReply); + if (std::shared_ptr<DBusProxyConnection> connection = _connection.lock()) { + bool isSuccessful = connection->sendDBusMessage(dbusMessageReply); + return isSuccessful; + } + else { + return false; + } } StubFunctor_ stubFunctor_; @@ -569,9 +706,9 @@ class DBusMethodWithReplyAdapterDispatcher<StubClass_, StubAdapterClass_, In_<In template <typename StubClass_, typename AttributeType_, typename AttributeDepl_ = EmptyDeployment> -class DBusGetAttributeStubDispatcher: public virtual DBusStubAdapterHelper<StubClass_>::StubDispatcher { +class DBusGetAttributeStubDispatcher: public virtual StubDispatcher<StubClass_> { public: - typedef DBusStubAdapterHelper<StubClass_> DBusStubAdapterHelperType; + typedef typename StubClass_::RemoteEventHandlerType RemoteEventHandlerType; typedef const AttributeType_& (StubClass_::*GetStubFunctor)(std::shared_ptr<CommonAPI::ClientId>); DBusGetAttributeStubDispatcher(GetStubFunctor _getStubFunctor, const char *_signature, AttributeDepl_ *_depl = nullptr): @@ -582,12 +719,14 @@ class DBusGetAttributeStubDispatcher: public virtual DBusStubAdapterHelper<StubC virtual ~DBusGetAttributeStubDispatcher() {}; - bool dispatchDBusMessage(const DBusMessage& dbusMessage, const std::shared_ptr<StubClass_>& stub, DBusStubAdapterHelperType& dbusStubAdapterHelper) { - return sendAttributeValueReply(dbusMessage, stub, dbusStubAdapterHelper); + bool dispatchDBusMessage(const DBusMessage& dbusMessage, const std::shared_ptr<StubClass_>& stub, + RemoteEventHandlerType* _remoteEventHandler, + std::weak_ptr<DBusProxyConnection> _connection) { + (void) _remoteEventHandler; + return sendAttributeValueReply(dbusMessage, stub, _connection); } - void appendGetAllReply(const DBusMessage& dbusMessage, const std::shared_ptr<StubClass_>& stub, DBusStubAdapterHelperType& dbusStubAdapterHelper, DBusOutputStream &_output) { - (void)dbusStubAdapterHelper; + void appendGetAllReply(const DBusMessage& dbusMessage, const std::shared_ptr<StubClass_>& stub, DBusOutputStream &_output) { std::shared_ptr<DBusClientId> clientId = std::make_shared<DBusClientId>(std::string(dbusMessage.getSender())); auto varDepl = CommonAPI::DBus::VariantDeployment<AttributeDepl_>(true, depl_); // presuming FreeDesktop variant deployment, as support for "legacy" service only @@ -596,7 +735,7 @@ class DBusGetAttributeStubDispatcher: public virtual DBusStubAdapterHelper<StubC } protected: - virtual bool sendAttributeValueReply(const DBusMessage& dbusMessage, const std::shared_ptr<StubClass_>& stub, DBusStubAdapterHelperType& dbusStubAdapterHelper) { + virtual bool sendAttributeValueReply(const DBusMessage& dbusMessage, const std::shared_ptr<StubClass_>& stub, std::weak_ptr<DBusProxyConnection> connection_) { DBusMessage dbusMessageReply = dbusMessage.createMethodReturn(signature_); DBusOutputStream dbusOutputStream(dbusMessageReply); @@ -604,8 +743,13 @@ class DBusGetAttributeStubDispatcher: public virtual DBusStubAdapterHelper<StubC dbusOutputStream << CommonAPI::Deployable<AttributeType_, AttributeDepl_>((stub.get()->*getStubFunctor_)(clientId), depl_); dbusOutputStream.flush(); - - return dbusStubAdapterHelper.getDBusConnection()->sendDBusMessage(dbusMessageReply); + if (std::shared_ptr<DBusProxyConnection> connection = connection_.lock()) { + bool isSuccessful = connection->sendDBusMessage(dbusMessageReply); + return isSuccessful; + } + else { + return false; + } } @@ -617,8 +761,7 @@ class DBusGetAttributeStubDispatcher: public virtual DBusStubAdapterHelper<StubC template <typename StubClass_, typename AttributeType_, typename AttributeDepl_ = EmptyDeployment> class DBusSetAttributeStubDispatcher: public virtual DBusGetAttributeStubDispatcher<StubClass_, AttributeType_, AttributeDepl_> { public: - typedef typename DBusGetAttributeStubDispatcher<StubClass_, AttributeType_, AttributeDepl_>::DBusStubAdapterHelperType DBusStubAdapterHelperType; - typedef typename DBusStubAdapterHelperType::RemoteEventHandlerType RemoteEventHandlerType; + typedef typename StubClass_::RemoteEventHandlerType RemoteEventHandlerType; typedef typename DBusGetAttributeStubDispatcher<StubClass_, AttributeType_, AttributeDepl_>::GetStubFunctor GetStubFunctor; typedef bool (RemoteEventHandlerType::*OnRemoteSetFunctor)(std::shared_ptr<CommonAPI::ClientId>, AttributeType_); @@ -636,14 +779,16 @@ class DBusSetAttributeStubDispatcher: public virtual DBusGetAttributeStubDispatc virtual ~DBusSetAttributeStubDispatcher() {}; - bool dispatchDBusMessage(const DBusMessage& dbusMessage, const std::shared_ptr<StubClass_>& stub, DBusStubAdapterHelperType& dbusStubAdapterHelper) { + bool dispatchDBusMessage(const DBusMessage& dbusMessage, const std::shared_ptr<StubClass_>& stub, + RemoteEventHandlerType* _remoteEventHandler, + std::weak_ptr<DBusProxyConnection> _connection) { bool attributeValueChanged; - if (!setAttributeValue(dbusMessage, stub, dbusStubAdapterHelper, attributeValueChanged)) + if (!setAttributeValue(dbusMessage, stub, _remoteEventHandler, _connection, attributeValueChanged)) return false; if (attributeValueChanged) - notifyOnRemoteChanged(dbusStubAdapterHelper); + notifyOnRemoteChanged(_remoteEventHandler); return true; } @@ -665,7 +810,8 @@ class DBusSetAttributeStubDispatcher: public virtual DBusGetAttributeStubDispatc inline bool setAttributeValue(const DBusMessage& dbusMessage, const std::shared_ptr<StubClass_>& stub, - DBusStubAdapterHelperType& dbusStubAdapterHelper, + RemoteEventHandlerType* _remoteEventHandler, + std::weak_ptr<DBusProxyConnection> _connection, bool& attributeValueChanged) { bool errorOccured; CommonAPI::Deployable<AttributeType_, AttributeDepl_> attributeValue( @@ -677,13 +823,13 @@ class DBusSetAttributeStubDispatcher: public virtual DBusGetAttributeStubDispatc std::shared_ptr<DBusClientId> clientId = std::make_shared<DBusClientId>(std::string(dbusMessage.getSender())); - attributeValueChanged = (dbusStubAdapterHelper.getRemoteEventHandler()->*onRemoteSetFunctor_)(clientId, std::move(attributeValue.getValue())); + attributeValueChanged = (_remoteEventHandler->*onRemoteSetFunctor_)(clientId, std::move(attributeValue.getValue())); - return this->sendAttributeValueReply(dbusMessage, stub, dbusStubAdapterHelper); + return this->sendAttributeValueReply(dbusMessage, stub, _connection); } - inline void notifyOnRemoteChanged(DBusStubAdapterHelperType& dbusStubAdapterHelper) { - (dbusStubAdapterHelper.getRemoteEventHandler()->*onRemoteChangedFunctor_)(); + inline void notifyOnRemoteChanged(RemoteEventHandlerType* _remoteEventHandler) { + (_remoteEventHandler->*onRemoteChangedFunctor_)(); } inline const AttributeType_& getAttributeValue(std::shared_ptr<CommonAPI::ClientId> clientId, const std::shared_ptr<StubClass_>& stub) { @@ -697,8 +843,8 @@ class DBusSetAttributeStubDispatcher: public virtual DBusGetAttributeStubDispatc template <typename StubClass_, typename AttributeType_, typename AttributeDepl_ = EmptyDeployment> class DBusSetObservableAttributeStubDispatcher: public virtual DBusSetAttributeStubDispatcher<StubClass_, AttributeType_, AttributeDepl_> { public: - typedef typename DBusSetAttributeStubDispatcher<StubClass_, AttributeType_, AttributeDepl_>::DBusStubAdapterHelperType DBusStubAdapterHelperType; - typedef typename DBusStubAdapterHelperType::StubAdapterType StubAdapterType; + typedef typename StubClass_::RemoteEventHandlerType RemoteEventHandlerType; + typedef typename StubClass_::StubAdapterType StubAdapterType; typedef typename DBusSetAttributeStubDispatcher<StubClass_, AttributeType_, AttributeDepl_>::GetStubFunctor GetStubFunctor; typedef typename DBusSetAttributeStubDispatcher<StubClass_, AttributeType_, AttributeDepl_>::OnRemoteSetFunctor OnRemoteSetFunctor; typedef typename DBusSetAttributeStubDispatcher<StubClass_, AttributeType_, AttributeDepl_>::OnRemoteChangedFunctor OnRemoteChangedFunctor; @@ -720,23 +866,25 @@ class DBusSetObservableAttributeStubDispatcher: public virtual DBusSetAttributeS virtual ~DBusSetObservableAttributeStubDispatcher() {}; - bool dispatchDBusMessage(const DBusMessage& dbusMessage, const std::shared_ptr<StubClass_>& stub, DBusStubAdapterHelperType& dbusStubAdapterHelper) { + bool dispatchDBusMessage(const DBusMessage& dbusMessage, const std::shared_ptr<StubClass_>& stub, + RemoteEventHandlerType* _remoteEventHandler, + std::weak_ptr<DBusProxyConnection> _connection) { bool attributeValueChanged; - if (!this->setAttributeValue(dbusMessage, stub, dbusStubAdapterHelper, attributeValueChanged)) + if (!this->setAttributeValue(dbusMessage, stub, _remoteEventHandler, _connection, attributeValueChanged)) return false; if (attributeValueChanged) { std::shared_ptr<DBusClientId> clientId = std::make_shared<DBusClientId>(std::string(dbusMessage.getSender())); - fireAttributeValueChanged(clientId, dbusStubAdapterHelper, stub); - this->notifyOnRemoteChanged(dbusStubAdapterHelper); + fireAttributeValueChanged(clientId, _remoteEventHandler, stub); + this->notifyOnRemoteChanged(_remoteEventHandler); } return true; } protected: virtual void fireAttributeValueChanged(std::shared_ptr<CommonAPI::ClientId> _client, - DBusStubAdapterHelperType &_helper, + RemoteEventHandlerType* _remoteEventHandler, const std::shared_ptr<StubClass_> _stub) { - (void)_helper; + (void)_remoteEventHandler; (_stub->StubType::getStubAdapter().get()->*fireChangedFunctor_)(this->getAttributeValue(_client, _stub)); } @@ -747,4 +895,3 @@ protected: } // namespace CommonAPI #endif // COMMONAPI_DBUS_DBUSSTUBADAPTERHELPER_HPP_ - diff --git a/src/CommonAPI/DBus/DBusAddressTranslator.cpp b/src/CommonAPI/DBus/DBusAddressTranslator.cpp index cb208c0..8f67aa2 100644 --- a/src/CommonAPI/DBus/DBusAddressTranslator.cpp +++ b/src/CommonAPI/DBus/DBusAddressTranslator.cpp @@ -67,9 +67,10 @@ DBusAddressTranslator::translate(const CommonAPI::Address &_key, DBusAddress &_v _value = it->second; } else if (isDefault_) { std::string interfaceName(_key.getInterface()); + std::replace(interfaceName.begin(), interfaceName.end(), ':', '.'); std::string objectPath("/" + _key.getInstance()); std::replace(objectPath.begin(), objectPath.end(), '.', '/'); - std::string service(_key.getInterface() + "_" + _key.getInstance()); + std::string service(interfaceName + "_" + _key.getInstance()); if (isValid(service, '.', false, false, true) && isValid(objectPath, '/', true) @@ -81,6 +82,11 @@ DBusAddressTranslator::translate(const CommonAPI::Address &_key, DBusAddress &_v forwards_.insert({ _key, _value }); backwards_.insert({ _value, _key }); } + else { + COMMONAPI_ERROR( + "Translation from CommonAPI address to DBus address failed!"); + result = false; + } } else { result = false; } @@ -101,6 +107,9 @@ DBusAddressTranslator::translate(const DBusAddress &_key, std::string &_value) { bool DBusAddressTranslator::translate(const DBusAddress &_key, CommonAPI::Address &_value) { bool result(true); + std::size_t itsInterfacePos; + std::string itsVersion; + bool isValidVersion(true); std::lock_guard<std::mutex> itsLock(mutex_); const auto it = backwards_.find(_key); @@ -109,6 +118,34 @@ DBusAddressTranslator::translate(const DBusAddress &_key, CommonAPI::Address &_v } else if (isDefault_) { if (isValid(_key.getObjectPath(), '/', true) && isValid(_key.getInterface(), '.')) { std::string interfaceName(_key.getInterface()); + itsInterfacePos = interfaceName.rfind('.'); + itsInterfacePos++; + if( itsInterfacePos != std::string::npos + && ( interfaceName.length() - itsInterfacePos >= 4) ) { + itsVersion = interfaceName.substr(itsInterfacePos); + if( itsVersion != "" ) { + std::size_t itsSeparatorPos = itsVersion.find('_'); + if (itsSeparatorPos == std::string::npos) { + isValidVersion = false; + } + if(isValidVersion) { + if( *(itsVersion.begin()) != 'v') { + isValidVersion = false; + } + if(isValidVersion) { + for (auto it = itsVersion.begin()+1; it != itsVersion.end(); ++it) { + if (!isdigit(*it) && *it != '_') { + isValidVersion = false; + break; + } + } + } + if( isValidVersion ) { + interfaceName.replace(itsInterfacePos - 1, 1, ":"); + } + } + } + } std::string instance(_key.getObjectPath().substr(1)); std::replace(instance.begin(), instance.end(), '/', '.'); diff --git a/src/CommonAPI/DBus/DBusConnection.cpp b/src/CommonAPI/DBus/DBusConnection.cpp index 6071830..23967a9 100644 --- a/src/CommonAPI/DBus/DBusConnection.cpp +++ b/src/CommonAPI/DBus/DBusConnection.cpp @@ -20,6 +20,22 @@ namespace CommonAPI { namespace DBus { +void MsgReplyQueueEntry::process(std::shared_ptr<DBusConnection> _connection) { + _connection->dispatchDBusMessageReply(message_, replyAsyncHandler_); +} + +void MsgReplyQueueEntry::clear() { + delete replyAsyncHandler_; +} + +void MsgQueueEntry::process(std::shared_ptr<DBusConnection> _connection) { + (void)_connection; +} + +void MsgQueueEntry::clear() { + +} + DBusConnectionStatusEvent::DBusConnectionStatusEvent(DBusConnection* dbusConnection): dbusConnection_(dbusConnection) { } @@ -52,6 +68,7 @@ DBusConnection::DBusConnection(DBusType_t busType, dispatchThread_(NULL), dispatchSource_(), watchContext_(NULL), + timeoutContext_(NULL), connection_(NULL), busType_(busType), dbusConnectionStatusEvent_(this), @@ -74,6 +91,7 @@ DBusConnection::DBusConnection(::DBusConnection *_connection, dispatchThread_(NULL), dispatchSource_(), watchContext_(NULL), + timeoutContext_(NULL), connection_(_connection), busType_(DBusType_t::WRAPPED), dbusConnectionStatusEvent_(this), @@ -86,7 +104,7 @@ DBusConnection::DBusConnection(::DBusConnection *_connection, activeConnections_(0), isDisconnecting_(false), isDispatching_(false), - isWaitingOnFinishedDispatching_(false){ + isWaitingOnFinishedDispatching_(false) { dbus_threads_init_default(); } @@ -106,10 +124,11 @@ DBusConnection::~DBusConnection() { dbus_connection_set_timeout_functions(connection_, NULL, NULL, NULL, NULL, NULL); } - lockedContext->deregisterWatch(msgWatch_); - lockedContext->deregisterDispatchSource(msgDispatchSource_); + lockedContext->deregisterWatch(queueWatch_); + lockedContext->deregisterDispatchSource(queueDispatchSource_); lockedContext->deregisterDispatchSource(dispatchSource_); delete watchContext_; + delete timeoutContext_; } // ensure, the registry survives until disconnecting is done... @@ -170,21 +189,26 @@ DBusConnection::~DBusConnection() { } } - bool DBusConnection::attachMainLoopContext(std::weak_ptr<MainLoopContext> mainLoopContext) { mainLoopContext_ = mainLoopContext; if (auto lockedContext = mainLoopContext_.lock()) { - msgWatch_ = new DBusMessageWatch(shared_from_this()); - msgDispatchSource_ = new DBusMessageDispatchSource(msgWatch_); + queueWatch_ = new DBusQueueWatch(shared_from_this()); + queueDispatchSource_ = new DBusQueueDispatchSource(queueWatch_); - lockedContext->registerDispatchSource(msgDispatchSource_); - lockedContext->registerWatch(msgWatch_); + lockedContext->registerDispatchSource(queueDispatchSource_); + lockedContext->registerWatch(queueWatch_); dispatchSource_ = new DBusDispatchSource(this); watchContext_ = new WatchContext(mainLoopContext_, dispatchSource_, shared_from_this()); + timeoutContext_ = new TimeoutContext(mainLoopContext_, shared_from_this()); lockedContext->registerDispatchSource(dispatchSource_); + if (!isConnected()) { + COMMONAPI_ERROR(std::string(__FUNCTION__), "not connected"); + return false; + } + dbus_connection_set_wakeup_main_function( connection_, &DBusConnection::onWakeupMainContext, @@ -208,7 +232,7 @@ bool DBusConnection::attachMainLoopContext(std::weak_ptr<MainLoopContext> mainLo &DBusConnection::onAddTimeout, &DBusConnection::onRemoveTimeout, &DBusConnection::onToggleTimeout, - &mainLoopContext_, + timeoutContext_, NULL); if (!success) { @@ -297,13 +321,13 @@ void DBusConnection::onToggleWatch(::DBusWatch* libdbusWatch, void* data) { dbus_bool_t DBusConnection::onAddTimeout(::DBusTimeout* libdbusTimeout, void* data) { - std::weak_ptr<MainLoopContext>* mainloop = static_cast<std::weak_ptr<MainLoopContext>*>(data); - if (NULL == mainloop) { - COMMONAPI_ERROR(std::string(__FUNCTION__), "mainloop == NULL"); + TimeoutContext* timeoutContext = static_cast<TimeoutContext*>(data); + if (NULL == timeoutContext) { + COMMONAPI_ERROR(std::string(__FUNCTION__), "timeoutContext == NULL"); return FALSE; } - DBusTimeout* dbusTimeout = new DBusTimeout(libdbusTimeout, *mainloop); + DBusTimeout* dbusTimeout = new DBusTimeout(libdbusTimeout, timeoutContext->mainLoopContext_, timeoutContext->dbusConnection_); dbus_timeout_set_data(libdbusTimeout, dbusTimeout, NULL); if (dbusTimeout->isReadyToBeMonitored()) { @@ -364,11 +388,11 @@ bool DBusConnection::connect(DBusError &dbusError, bool startDispatchThread) { #ifdef _MSC_VER COMMONAPI_ERROR(std::string(__FUNCTION__) + ": Name: " + dbusError.getName() + - " Message: " + dbusError.getMessage()) + " Message: " + dbusError.getMessage()); #else COMMONAPI_ERROR(std::string(__PRETTY_FUNCTION__) + ": Name: " + dbusError.getName() + - " Message: " + dbusError.getMessage()) + " Message: " + dbusError.getMessage()); #endif return false; } @@ -404,8 +428,15 @@ void DBusConnection::disconnect() { std::lock_guard<std::mutex> dbusConnectionLock(connectionGuard_); std::unique_lock<std::mutex> dispatchLock(dispatchMutex_); + + std::shared_ptr<DBusServiceRegistry> itsRegistry = DBusServiceRegistry::get(shared_from_this()); + isDisconnecting_ = true; + if (std::shared_ptr<CommonAPI::MainLoopContext> mainLoopContext = mainLoopContext_.lock()) { + Factory::get()->releaseConnection(connectionId_); + } + if (isConnected()) { dbusConnectionStatusEvent_.notifyListeners(AvailabilityStatus::NOT_AVAILABLE); @@ -457,10 +488,6 @@ void DBusConnection::disconnect() { dbus_connection_unref(connection_); connection_ = nullptr; } - - if (std::shared_ptr<CommonAPI::MainLoopContext> mainLoopContext = mainLoopContext_.lock()) { - Factory::get()->releaseConnection(connectionId_, mainLoopContext.get()); - } } bool DBusConnection::isConnected() const { @@ -504,11 +531,11 @@ bool DBusConnection::requestServiceNameAndBlock(const std::string& serviceName) #ifdef _MSC_VER // Visual Studio COMMONAPI_ERROR(std::string(__FUNCTION__) + ": Name: " + dbusError.getName() + - " Message: " + dbusError.getMessage()) + " Message: " + dbusError.getMessage()); #else COMMONAPI_ERROR(std::string(__PRETTY_FUNCTION__) + ": Name: " + dbusError.getName() + - " Message: " + dbusError.getMessage()) + " Message: " + dbusError.getMessage()); #endif } } @@ -594,10 +621,11 @@ void DBusConnection::onLibdbusPendingCall(::DBusPendingCall* _libdbusPendingCall _dbusMessageReplyAsyncHandler->lock(); // libdbus calls the cleanup method below - if(_libdbusPendingCall) + if(_libdbusPendingCall && processAsyncHandler) { dbus_pending_call_unref(_libdbusPendingCall); + _dbusMessageReplyAsyncHandler->setExecutionFinished(); + } - _dbusMessageReplyAsyncHandler->setExecutionFinished(); if (_dbusMessageReplyAsyncHandler->hasToBeDeleted()) { _dbusMessageReplyAsyncHandler->unlock(); delete _dbusMessageReplyAsyncHandler; @@ -619,7 +647,6 @@ void DBusConnection::onLibdbusPendingCallNotifyThunk(::DBusPendingCall* _libdbus auto pendingCallNotificationData = reinterpret_cast<PendingCallNotificationData*>(_userData); auto dbusMessageReplyAsyncHandler = pendingCallNotificationData->replyAsyncHandler_; auto dbusConnection = pendingCallNotificationData->dbusConnection_; - delete pendingCallNotificationData; DBusMessage dbusMessage = DBusConnection::convertToDBusMessage(_libdbusPendingCall); @@ -627,9 +654,8 @@ void DBusConnection::onLibdbusPendingCallNotifyThunk(::DBusPendingCall* _libdbus } void DBusConnection::onLibdbusDataCleanup(void *_data) { - // Dummy method -> deleting of userData is not executed in this method. Deleting is - // executed by handling of the timeouts. - (void)_data; + auto pendingCallNotificationData = reinterpret_cast<PendingCallNotificationData*>(_data); + delete pendingCallNotificationData; } //Would not be needed if libdbus would actually handle its timeouts for pending calls. @@ -648,7 +674,7 @@ void DBusConnection::enforceAsynchronousTimeouts() const { auto minTimeout = std::get<0>(minTimeoutElement->second); - std::chrono::high_resolution_clock::time_point now = std::chrono::high_resolution_clock::now(); + std::chrono::steady_clock::time_point now = (std::chrono::steady_clock::time_point) std::chrono::steady_clock::now(); timeout = (int)std::chrono::duration_cast<std::chrono::milliseconds>(minTimeout - now).count(); } @@ -662,7 +688,7 @@ void DBusConnection::enforceAsynchronousTimeouts() const { enforceTimeoutMutex_.lock(); auto it = timeoutMap_.begin(); while (it != timeoutMap_.end()) { - std::chrono::high_resolution_clock::time_point now = std::chrono::high_resolution_clock::now(); + std::chrono::steady_clock::time_point now = (std::chrono::steady_clock::time_point) std::chrono::steady_clock::now(); DBusMessageReplyAsyncHandler* asyncHandler = std::get<1>(it->second); DBusPendingCall* libdbusPendingCall = it->first; @@ -683,7 +709,7 @@ void DBusConnection::enforceAsynchronousTimeouts() const { if (executionStarted && !executionFinished) { // execution of asyncHandler is still running // ==> add 100 ms for next timeout check - std::get<0>(it->second) = std::chrono::high_resolution_clock::now() + std::chrono::milliseconds(100); + std::get<0>(it->second) = (std::chrono::steady_clock::time_point) std::chrono::steady_clock::now() + std::chrono::milliseconds(100); } else { if (!executionFinished) { // execution of asyncHandler was not finished (and not started) @@ -729,6 +755,7 @@ void DBusConnection::enforceAsynchronousTimeouts() const { } enforceTimeoutMutex_.unlock(); } else { + std::lock_guard<std::mutex> itsLock(enforceTimeoutMutex_); auto it = timeoutMap_.begin(); @@ -768,18 +795,20 @@ void DBusConnection::enforceAsynchronousTimeouts() const { } } -std::future<CallStatus> DBusConnection::sendDBusMessageWithReplyAsync( +bool DBusConnection::sendDBusMessageWithReplyAsync( const DBusMessage& dbusMessage, std::unique_ptr<DBusMessageReplyAsyncHandler> dbusMessageReplyAsyncHandler, const CommonAPI::CallInfo *_info) const { + std::lock_guard<std::mutex> dbusConnectionLock(connectionGuard_); + if (!dbusMessage) { COMMONAPI_ERROR(std::string(__FUNCTION__), "message == NULL"); - return std::future<CallStatus>(); + return false; } if (!isConnected()) { COMMONAPI_ERROR(std::string(__FUNCTION__), "not connected"); - return std::future<CallStatus>(); + return false; } DBusPendingCall* libdbusPendingCall; @@ -787,14 +816,8 @@ std::future<CallStatus> DBusConnection::sendDBusMessageWithReplyAsync( DBusMessageReplyAsyncHandler* replyAsyncHandler = dbusMessageReplyAsyncHandler.release(); - std::future<CallStatus> callStatusFuture; - try { - callStatusFuture = replyAsyncHandler->getFuture(); - } catch (std::exception& e) { - (void)e; - } - PendingCallNotificationData* userData = new PendingCallNotificationData(this, replyAsyncHandler); + DBusTimeout::currentTimeout_ = NULL; libdbusSuccess = dbus_connection_send_with_reply_set_notify(connection_, dbusMessage.message_, @@ -811,10 +834,10 @@ std::future<CallStatus> DBusConnection::sendDBusMessageWithReplyAsync( if (!libdbusSuccess || !libdbusPendingCall) { #ifdef _MSC_VER // Visual Studio COMMONAPI_ERROR(std::string(__FUNCTION__) + - ": (!libdbusSuccess || !libdbusPendingCall) == true") + ": (!libdbusSuccess || !libdbusPendingCall) == true"); #else COMMONAPI_ERROR(std::string(__PRETTY_FUNCTION__) + - ": (!libdbusSuccess || !libdbusPendingCall) == true") + ": (!libdbusSuccess || !libdbusPendingCall) == true"); #endif if (libdbusPendingCall) { dbus_pending_call_unref(libdbusPendingCall); @@ -827,13 +850,13 @@ std::future<CallStatus> DBusConnection::sendDBusMessageWithReplyAsync( nullptr)); } mainLoopContext_.lock()->wakeup(); - return callStatusFuture; + return true; } if (_info->timeout_ != DBUS_TIMEOUT_INFINITE) { - auto timeoutPoint = std::chrono::high_resolution_clock::now() + std::chrono::milliseconds(_info->timeout_); + auto timeoutPoint = (std::chrono::steady_clock::time_point) std::chrono::steady_clock::now() + std::chrono::milliseconds(_info->timeout_); std::tuple< - std::chrono::time_point<std::chrono::high_resolution_clock>, + std::chrono::steady_clock::time_point, DBusMessageReplyAsyncHandler*, DBusMessage> toInsert { timeoutPoint, @@ -841,11 +864,20 @@ std::future<CallStatus> DBusConnection::sendDBusMessageWithReplyAsync( dbusMessage }; + if(DBusTimeout::currentTimeout_) + DBusTimeout::currentTimeout_->setPendingCall(libdbusPendingCall); + enforceTimeoutMutex_.lock(); auto ret = timeoutMap_.insert( { libdbusPendingCall, toInsert } ); if (ret.second == false) { // key has been reused // update the map value with the new info + DBusMessageReplyAsyncHandler* asyncHandler; + auto it = timeoutMap_.find(ret.first->first); + if(it != timeoutMap_.end()) { + asyncHandler = std::get<1>(it->second); + delete asyncHandler; + } timeoutMap_.erase(ret.first); timeoutMap_.insert( { libdbusPendingCall, toInsert } ); } @@ -860,7 +892,7 @@ std::future<CallStatus> DBusConnection::sendDBusMessageWithReplyAsync( timeoutInfiniteAsyncHandlers_.insert(replyAsyncHandler); } - return callStatusFuture; + return true; } DBusMessage DBusConnection::sendDBusMessageWithReplyAndBlock(const DBusMessage& dbusMessage, @@ -902,17 +934,24 @@ void DBusConnection::dispatchDBusMessageReply(const DBusMessage& _reply, } bool DBusConnection::singleDispatch() { + std::list<MainloopTimeout_t> mainloopTimeouts; { - std::lock_guard<std::mutex> itsLock(mainloopTimeoutsMutex_); + mainloopTimeoutsMutex_.lock(); for (auto t : mainloopTimeouts_) { - std::get<0>(t)->onDBusMessageReply(std::get<2>(t), std::get<1>(t)); - if (std::get<3>(t) != nullptr) { - dbus_pending_call_unref(std::get<3>(t)); - } - delete std::get<0>(t); + mainloopTimeouts.push_back(t); } mainloopTimeouts_.clear(); + mainloopTimeoutsMutex_.unlock(); } + + for (auto t : mainloopTimeouts) { + std::get<0>(t)->onDBusMessageReply(std::get<2>(t), std::get<1>(t)); + if (std::get<3>(t) != nullptr) { + dbus_pending_call_unref(std::get<3>(t)); + } + delete std::get<0>(t); + } + if(setDispatching(true)) { bool dispatchStatus(connection_ && dbus_connection_dispatch(connection_) == DBUS_DISPATCH_DATA_REMAINS); setDispatching(false); @@ -948,10 +987,13 @@ void DBusConnection::incrementConnection() { } void DBusConnection::decrementConnection() { - std::lock_guard < std::mutex > lock(activeConnectionsMutex_); - activeConnections_--; + int activeConnections = 0; + { + std::lock_guard < std::mutex > lock(activeConnectionsMutex_); + activeConnections = --activeConnections_; + } - if (activeConnections_ <= 0) { + if (activeConnections <= 0) { disconnect(); } } @@ -978,19 +1020,25 @@ bool DBusConnection::setDispatching(bool _isDispatching) { } } -void DBusConnection::sendPendingSelectiveSubscription(DBusProxy* callingProxy, std::string methodName, - DBusSignalHandler* dbusSignalHandler, uint32_t tag) { +void DBusConnection::sendPendingSelectiveSubscription(DBusProxy* callingProxy, std::string interfaceMemberName, + DBusSignalHandler* dbusSignalHandler, uint32_t tag, std::string interfaceMemberSignature) { bool outarg; + std::string methodName = "subscribeFor" + interfaceMemberName + "Selective"; DBusProxyHelper<CommonAPI::DBus::DBusSerializableArguments<>, CommonAPI::DBus::DBusSerializableArguments<bool>>::callMethodAsync( *callingProxy, methodName.c_str(), "", &CommonAPI::DBus::defaultCallInfo, - [this, dbusSignalHandler, callingProxy, tag] + [this, dbusSignalHandler, callingProxy, tag, interfaceMemberName, interfaceMemberSignature] (const CommonAPI::CallStatus& callStatus, const bool& accepted) { if (callStatus == CommonAPI::CallStatus::SUCCESS && accepted) { dbusSignalHandler->onSpecificError(CommonAPI::CallStatus::SUCCESS, tag); } else { + const DBusSignalHandlerPath itsToken(callingProxy->getDBusAddress().getObjectPath(), + callingProxy->getDBusAddress().getInterface(), + interfaceMemberName, + interfaceMemberSignature); + removeSignalMemberHandler(itsToken, dbusSignalHandler); dbusSignalHandler->onSpecificError(CommonAPI::CallStatus::SUBSCRIPTION_REFUSED, tag); } }, std::make_tuple(outarg)); @@ -1007,30 +1055,33 @@ void DBusConnection::subscribeForSelectiveBroadcast( std::string methodName = "subscribeFor" + interfaceMemberName + "Selective"; + DBusProxyConnection::DBusSignalHandlerToken token = addSignalMemberHandler( + objectPath, + interfaceName, + interfaceMemberName, + interfaceMemberSignature, + dbusSignalHandler, + true + ); + dbusSignalHandler->setSubscriptionToken(token, tag); + bool outarg; - DBusProxyHelper<CommonAPI::DBus::DBusSerializableArguments<>, - CommonAPI::DBus::DBusSerializableArguments<bool>>::callMethodAsync( - *callingProxy, methodName.c_str(), "", - &CommonAPI::DBus::defaultCallInfo, - [this, objectPath, interfaceName, interfaceMemberName, interfaceMemberSignature, dbusSignalHandler, callingProxy, tag] - (const CommonAPI::CallStatus& callStatus, const bool& accepted) { - if ((callStatus == CommonAPI::CallStatus::SUCCESS && accepted) || !callingProxy->isAvailable()) { - DBusProxyConnection::DBusSignalHandlerToken token = addSignalMemberHandler( - objectPath, - interfaceName, - interfaceMemberName, - interfaceMemberSignature, - dbusSignalHandler, - true - ); - dbusSignalHandler->setSubscriptionToken(token, tag); - } - if (accepted) { - dbusSignalHandler->onSpecificError(CommonAPI::CallStatus::SUCCESS, tag); - } else { - dbusSignalHandler->onSpecificError(CommonAPI::CallStatus::SUBSCRIPTION_REFUSED, tag); - } - }, std::make_tuple(outarg)); + if(callingProxy->isAvailable()) { + DBusProxyHelper<CommonAPI::DBus::DBusSerializableArguments<>, + CommonAPI::DBus::DBusSerializableArguments<bool>>::callMethodAsync( + *callingProxy, methodName.c_str(), "", + &CommonAPI::DBus::defaultCallInfo, + [this, objectPath, interfaceName, interfaceMemberName, interfaceMemberSignature, dbusSignalHandler, callingProxy, tag, token] + (const CommonAPI::CallStatus& callStatus, const bool& accepted) { + (void)callStatus; + if (accepted) { + dbusSignalHandler->onSpecificError(CommonAPI::CallStatus::SUCCESS, tag); + } else { + removeSignalMemberHandler(token, dbusSignalHandler); + dbusSignalHandler->onSpecificError(CommonAPI::CallStatus::SUBSCRIPTION_REFUSED, tag); + } + }, std::make_tuple(outarg)); + } } void DBusConnection::unsubscribeFromSelectiveBroadcast(const std::string& eventName, @@ -1585,6 +1636,9 @@ void notifyDBusOMSignalHandlers(DBusSignalHandlersTable& dbusSignalHandlerstable signalEntry->second.first->lock(); signalGuard_.unlock(); + // ensure, the registry survives + std::shared_ptr<DBusServiceRegistry> itsRegistry_ = DBusServiceRegistry::get(shared_from_this()); + notifyDBusSignalHandlers(dbusSignalHandlerTable_, signalEntry, dbusMessage, dbusHandlerResult); @@ -1668,14 +1722,27 @@ std::shared_ptr<DBusConnection> DBusConnection::wrap(::DBusConnection *_connecti return std::make_shared<DBusConnection>(_connection, _connectionId); } -void DBusConnection::pushDBusMessageReply(const DBusMessage& _reply, +void DBusConnection::pushDBusMessageReplyToMainLoop(const DBusMessage& _reply, std::unique_ptr<DBusMessageReplyAsyncHandler> _dbusMessageReplyAsyncHandler) { // push message to the message queue DBusMessageReplyAsyncHandler* replyAsyncHandler = _dbusMessageReplyAsyncHandler.release(); replyAsyncHandler->setHasToBeDeleted(); - std::shared_ptr<DBusMessageWatch::MsgReplyQueueEntry> msgReplyQueueEntry = std::make_shared<DBusMessageWatch::MsgReplyQueueEntry>( + std::shared_ptr<MsgReplyQueueEntry> msgReplyQueueEntry = std::make_shared<MsgReplyQueueEntry>( replyAsyncHandler, _reply); - msgWatch_->pushMsgQueue(msgReplyQueueEntry); + queueWatch_->pushQueue(msgReplyQueueEntry); +} + +void DBusConnection::setPendingCallTimedOut(DBusPendingCall* _pendingCall, ::DBusTimeout* _timeout) const { + std::lock_guard<std::mutex> lock(enforceTimeoutMutex_); + auto it = timeoutMap_.find(_pendingCall); + if(it != timeoutMap_.end()) { + auto replyAsyncHandler = std::get<1>(it->second); + replyAsyncHandler->lock(); + if(!replyAsyncHandler->getTimeoutOccurred()) { + dbus_timeout_handle(_timeout); + } + replyAsyncHandler->unlock(); + } } } // namespace DBus diff --git a/src/CommonAPI/DBus/DBusDaemonProxy.cpp b/src/CommonAPI/DBus/DBusDaemonProxy.cpp index 58ee08f..3b5b29f 100644 --- a/src/CommonAPI/DBus/DBusDaemonProxy.cpp +++ b/src/CommonAPI/DBus/DBusDaemonProxy.cpp @@ -62,7 +62,15 @@ bool DBusDaemonProxy::isAvailableBlocking() const { std::future<AvailabilityStatus> DBusDaemonProxy::isAvailableAsync( isAvailableAsyncCallback _callback, const CallInfo *_info) const { - return isAvailableAsync(_callback, _info); + (void)_callback; + (void)_info; + std::promise<AvailabilityStatus> promise; + if(isAvailable()) { + promise.set_value(CommonAPI::AvailabilityStatus::AVAILABLE); + } else { + promise.set_value(CommonAPI::AvailabilityStatus::NOT_AVAILABLE); + } + return promise.get_future(); } ProxyStatusEvent& DBusDaemonProxy::getProxyStatusEvent() { diff --git a/src/CommonAPI/DBus/DBusFactory.cpp b/src/CommonAPI/DBus/DBusFactory.cpp index d7fa84c..1c05455 100644 --- a/src/CommonAPI/DBus/DBusFactory.cpp +++ b/src/CommonAPI/DBus/DBusFactory.cpp @@ -47,7 +47,9 @@ Factory::init() { void Factory::registerInterface(InterfaceInitFunction _function) { +#ifndef WIN32 std::lock_guard<std::mutex> itsLock(initializerMutex_); +#endif if (isInitialized_) { // We are already running --> initialize the interface library! _function(); @@ -79,11 +81,15 @@ Factory::createProxy( DBusAddress dbusAddress; if (DBusAddressTranslator::get()->translate(address, dbusAddress)) { - std::shared_ptr<DBusProxy> proxy - = proxyCreateFunctionsIterator->second(dbusAddress, getConnection(_connectionId)); - if (proxy) - proxy->init(); - return proxy; + std::shared_ptr<DBusConnection> connection + = getConnection(_connectionId); + if (connection) { + std::shared_ptr<DBusProxy> proxy + = proxyCreateFunctionsIterator->second(dbusAddress, connection); + if (proxy) + proxy->init(); + return proxy; + } } } return nullptr; @@ -100,11 +106,15 @@ Factory::createProxy( DBusAddress dbusAddress; if (DBusAddressTranslator::get()->translate(address, dbusAddress)) { - std::shared_ptr<DBusProxy> proxy - = proxyCreateFunctionsIterator->second(dbusAddress, getConnection(_context)); - if (proxy) - proxy->init(); - return proxy; + std::shared_ptr<DBusConnection> connection + = getConnection(_context); + if (connection) { + std::shared_ptr<DBusProxy> proxy + = proxyCreateFunctionsIterator->second(dbusAddress, connection); + if (proxy) + proxy->init(); + return proxy; + } } } @@ -120,11 +130,15 @@ Factory::registerStub( CommonAPI::Address address(_domain, _interface, _instance); DBusAddress dbusAddress; if (DBusAddressTranslator::get()->translate(address, dbusAddress)) { - std::shared_ptr<DBusStubAdapter> adapter - = stubAdapterCreateFunctionsIterator->second(dbusAddress, getConnection(_connectionId), _stub); - if (adapter) { - adapter->init(adapter); - return registerStubAdapter(adapter); + std::shared_ptr<DBusConnection> connection = getConnection(_connectionId); + if (connection) { + std::shared_ptr<DBusStubAdapter> adapter + = stubAdapterCreateFunctionsIterator->second(dbusAddress, connection, _stub); + if (adapter) { + adapter->init(adapter); + if (registerStubAdapter(adapter)) + return true; + } } } } @@ -141,11 +155,15 @@ Factory::registerStub( CommonAPI::Address address(_domain, _interface, _instance); DBusAddress dbusAddress; if (DBusAddressTranslator::get()->translate(address, dbusAddress)) { - std::shared_ptr<DBusStubAdapter> adapter - = stubAdapterCreateFunctionsIterator->second(dbusAddress, getConnection(_context), _stub); - if (adapter) { - adapter->init(adapter); - return registerStubAdapter(adapter); + std::shared_ptr<DBusConnection> connection = getConnection(_context); + if (connection) { + std::shared_ptr<DBusStubAdapter> adapter + = stubAdapterCreateFunctionsIterator->second(dbusAddress, connection, _stub); + if (adapter) { + adapter->init(adapter); + if (registerStubAdapter(adapter)) + return true; + } } } } @@ -175,8 +193,10 @@ Factory::unregisterStub(const std::string &_domain, const std::string &_interfac services_.erase(adapterResult->first); + decrementConnection(connection); + return true; - } + } return false; } @@ -245,20 +265,29 @@ Factory::unregisterStubAdapter(std::shared_ptr<DBusStubAdapter> _adapter) { /////////////////////////////////////////////////////////////////////////////// std::shared_ptr<DBusConnection> Factory::getConnection(const ConnectionId_t &_connectionId) { - - std::lock_guard<std::mutex> itsGuard(connectionsMutex_); + std::lock_guard<std::recursive_mutex> itsConnectionGuard(connectionsMutex_); + std::shared_ptr<DBusConnection> itsConnection; auto itsConnectionIterator = connections_.find(_connectionId); - if (itsConnectionIterator != connections_.end()) { - return itsConnectionIterator->second; + if (itsConnectionIterator != connections_.end()) + itsConnection = itsConnectionIterator->second; + + if(!itsConnection) { + // No connection found, lets create and initialize one + const DBusType_t dbusType = DBusAddressTranslator::get()->getDBusBusType(_connectionId); + itsConnection = std::make_shared<DBusConnection>(dbusType, _connectionId); + + if (itsConnection) { + if (!itsConnection->connect(true)) { + COMMONAPI_ERROR("Failed to create connection ", _connectionId); + itsConnection.reset(); + } else { + connections_.insert({ _connectionId, itsConnection } ); + } + } } - // No connection found, lets create and initialize one - const DBusType_t dbusType = DBusAddressTranslator::get()->getDBusBusType(_connectionId); - std::shared_ptr<DBusConnection> itsConnection - = std::make_shared<DBusConnection>(dbusType, _connectionId); - connections_.insert({ _connectionId, itsConnection }); + incrementConnection(itsConnection); - itsConnection->connect(true); return itsConnection; } @@ -267,21 +296,29 @@ Factory::getConnection(std::shared_ptr<MainLoopContext> _context) { if (!_context) return getConnection(DEFAULT_CONNECTION_ID); - std::lock_guard<std::mutex> itsGuard(contextConnectionsMutex_); - auto itsConnectionIterator = contextConnections_.find(_context.get()); - if (itsConnectionIterator != contextConnections_.end()) { - return itsConnectionIterator->second; + std::lock_guard<std::recursive_mutex> itsConnectionGuard(connectionsMutex_); + std::shared_ptr<DBusConnection> itsConnection; + auto itsConnectionIterator = connections_.find(_context->getName()); + if (itsConnectionIterator != connections_.end()) + itsConnection = itsConnectionIterator->second; + + if(!itsConnection) { + // No connection found, lets create and initialize one + const DBusType_t dbusType = DBusAddressTranslator::get()->getDBusBusType(_context->getName()); + itsConnection = std::make_shared<DBusConnection>(dbusType, _context->getName()); + + if (itsConnection) { + if (!itsConnection->connect(false)) { + itsConnection.reset(); + } else { + connections_.insert({ _context->getName(), itsConnection } ); + itsConnection->attachMainLoopContext(_context); + } + } } - // No connection found, lets create and initialize one - const DBusType_t dbusType = DBusAddressTranslator::get()->getDBusBusType(_context->getName()); - std::shared_ptr<DBusConnection> itsConnection - = std::make_shared<DBusConnection>(dbusType, _context->getName()); - contextConnections_.insert({ _context.get(), itsConnection } ); - - itsConnection->connect(false); - if (_context) - itsConnection->attachMainLoopContext(_context); + if (itsConnection) + incrementConnection(itsConnection); return itsConnection; } @@ -343,9 +380,11 @@ Factory::registerManagedService(const std::shared_ptr<DBusStubAdapter> &_stubAda services_.erase(insertResult.first); } + if(isAcquired) + incrementConnection(_stubAdapter->getDBusConnection()); + return isAcquired; } - return false; } @@ -370,90 +409,47 @@ Factory::unregisterManagedService(const ServicesMap::iterator &iterator) { if (isUnregistered) { connection->releaseServiceName(serviceName); services_.erase(iterator); + decrementConnection(connection); } // TODO: log error return isUnregistered; } void Factory::incrementConnection(std::shared_ptr<DBusProxyConnection> _connection) { + std::lock_guard<std::recursive_mutex> itsConnectionGuard(connectionsMutex_); std::shared_ptr<DBusConnection> connection; - { - std::lock_guard<std::mutex> itsConnectionGuard(connectionsMutex_); - for (auto itsConnectionIterator = connections_.begin(); itsConnectionIterator != connections_.end(); itsConnectionIterator++) { - if (itsConnectionIterator->second == _connection) { - connection = itsConnectionIterator->second; - break; - } + for (auto itsConnectionIterator = connections_.begin(); itsConnectionIterator != connections_.end(); itsConnectionIterator++) { + if (itsConnectionIterator->second == _connection) { + connection = itsConnectionIterator->second; + break; } } if(connection) connection->incrementConnection(); - - std::shared_ptr<DBusConnection> contextConnection; - { - std::lock_guard<std::mutex> itsContextConnectionGuard(contextConnectionsMutex_); - for (auto itsConnectionIterator = contextConnections_.begin(); itsConnectionIterator != contextConnections_.end(); itsConnectionIterator++) { - if (itsConnectionIterator->second == _connection) { - contextConnection = itsConnectionIterator->second; - break; - } - } - } - - if(contextConnection) - contextConnection->incrementConnection(); } void Factory::decrementConnection(std::shared_ptr<DBusProxyConnection> _connection) { + std::lock_guard<std::recursive_mutex> itsConnectionGuard(connectionsMutex_); std::shared_ptr<DBusConnection> connection; - { - std::lock_guard<std::mutex> itsConnectionGuard(connectionsMutex_); - for (auto itsConnectionIterator = connections_.begin(); itsConnectionIterator != connections_.end(); itsConnectionIterator++) { - if (itsConnectionIterator->second == _connection) { - connection = itsConnectionIterator->second; - break; - } + for (auto itsConnectionIterator = connections_.begin(); itsConnectionIterator != connections_.end(); itsConnectionIterator++) { + if (itsConnectionIterator->second == _connection) { + connection = itsConnectionIterator->second; + break; } } if(connection) connection->decrementConnection(); - - std::shared_ptr<DBusConnection> contextConnection; - { - std::lock_guard<std::mutex> itsContextConnectionGuard(contextConnectionsMutex_); - for (auto itsConnectionIterator = contextConnections_.begin(); itsConnectionIterator != contextConnections_.end(); itsConnectionIterator++) { - if (itsConnectionIterator->second == _connection) { - contextConnection = itsConnectionIterator->second; - break; - } - } - } - - if(contextConnection) - contextConnection->decrementConnection(); } -void Factory::releaseConnection(const ConnectionId_t& _connectionId, MainLoopContext* _mainloopContext) { - { - std::lock_guard<std::mutex> itsConnectionGuard(connectionsMutex_); - auto connection = connections_.find(_connectionId); +void Factory::releaseConnection(const ConnectionId_t& _connectionId) { + std::lock_guard<std::recursive_mutex> itsConnectionGuard(connectionsMutex_); + auto itsConnection = connections_.find(_connectionId); - if (connection != connections_.end()) { - DBusServiceRegistry::remove(connection->second); - connections_.erase(_connectionId); - } - } - - { - std::lock_guard<std::mutex> itsContextConnectionGuard(contextConnectionsMutex_); - auto connectionContext = contextConnections_.find(_mainloopContext); - - if (connectionContext != contextConnections_.end()) { - DBusServiceRegistry::remove(connectionContext->second); - contextConnections_.erase(_mainloopContext); - } + if (itsConnection != connections_.end()) { + DBusServiceRegistry::remove(itsConnection->second); + connections_.erase(_connectionId); } } diff --git a/src/CommonAPI/DBus/DBusInstanceAvailabilityStatusChangedEvent.cpp b/src/CommonAPI/DBus/DBusInstanceAvailabilityStatusChangedEvent.cpp new file mode 100644 index 0000000..4f93feb --- /dev/null +++ b/src/CommonAPI/DBus/DBusInstanceAvailabilityStatusChangedEvent.cpp @@ -0,0 +1,269 @@ +// Copyright (C) 2013-2015 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +#include <CommonAPI/DBus/DBusInstanceAvailabilityStatusChangedEvent.hpp> + +#include <CommonAPI/DBus/DBusAddressTranslator.hpp> + +namespace CommonAPI { +namespace DBus { + +DBusInstanceAvailabilityStatusChangedEvent::DBusInstanceAvailabilityStatusChangedEvent( + DBusProxy &_proxy, + const std::string &_dbusInterfaceName, + const std::string &_capiInterfaceName) : + proxy_(_proxy), + observedDbusInterfaceName_(_dbusInterfaceName), + observedCapiInterfaceName_(_capiInterfaceName), + registry_(DBusServiceRegistry::get(_proxy.getDBusConnection())) { +} + +DBusInstanceAvailabilityStatusChangedEvent::~DBusInstanceAvailabilityStatusChangedEvent() { + proxy_.removeSignalMemberHandler(interfacesAddedSubscription_, this); + proxy_.removeSignalMemberHandler(interfacesRemovedSubscription_, this); +} + +void DBusInstanceAvailabilityStatusChangedEvent::onSignalDBusMessage(const DBusMessage& dbusMessage) { + if (dbusMessage.hasMemberName("InterfacesAdded")) { + onInterfacesAddedSignal(dbusMessage); + } else if (dbusMessage.hasMemberName("InterfacesRemoved")) { + onInterfacesRemovedSignal(dbusMessage); + } +} + +void DBusInstanceAvailabilityStatusChangedEvent::getAvailableServiceInstances( + CommonAPI::CallStatus &_status, + std::vector<DBusAddress> &_availableServiceInstances) { + + _availableServiceInstances.clear(); + DBusObjectManagerStub::DBusObjectPathAndInterfacesDict itsAvailableServiceInstances; + registry_->getAvailableServiceInstances(proxy_.getDBusAddress().getService(), + proxy_.getDBusAddress().getObjectPath(), + itsAvailableServiceInstances); + + _status = CommonAPI::CallStatus::SUCCESS; + translate(itsAvailableServiceInstances, _availableServiceInstances); +} + +std::future<CallStatus> DBusInstanceAvailabilityStatusChangedEvent::getAvailableServiceInstancesAsync( + GetAvailableServiceInstancesCallback _callback) { + + std::shared_ptr<std::promise<CallStatus> > promise = std::make_shared<std::promise<CallStatus>>(); + registry_->getAvailableServiceInstancesAsync(std::bind( + &DBusInstanceAvailabilityStatusChangedEvent::serviceInstancesAsyncCallback, + this, + proxy_.shared_from_this(), + std::placeholders::_1, + _callback, + promise), + proxy_.getDBusAddress().getService(), + proxy_.getDBusAddress().getObjectPath()); + return promise->get_future(); +} + +void DBusInstanceAvailabilityStatusChangedEvent::getServiceInstanceAvailabilityStatus( + const std::string &_instance, + CallStatus &_callStatus, + AvailabilityStatus &_availabilityStatus) { + + CommonAPI::Address itsAddress("local", observedCapiInterfaceName_, _instance); + DBusAddress itsDBusAddress; + DBusAddressTranslator::get()->translate(itsAddress, itsDBusAddress); + + _availabilityStatus = AvailabilityStatus::NOT_AVAILABLE; + if (registry_->isServiceInstanceAlive( + itsDBusAddress.getInterface(), + itsDBusAddress.getService(), + itsDBusAddress.getObjectPath())) { + _availabilityStatus = AvailabilityStatus::AVAILABLE; + } + _callStatus = CallStatus::SUCCESS; +} + +std::future<CallStatus> DBusInstanceAvailabilityStatusChangedEvent::getServiceInstanceAvailabilityStatusAsync( + const std::string& _instance, + ProxyManager::GetInstanceAvailabilityStatusCallback _callback) { + + std::shared_ptr<std::promise<CallStatus> > promise = std::make_shared<std::promise<CallStatus>>(); + auto proxy = proxy_.shared_from_this(); + std::async(std::launch::async, [this, _instance, _callback, promise, proxy]() { + CallStatus callStatus; + AvailabilityStatus availabilityStatus; + getServiceInstanceAvailabilityStatus(_instance, callStatus, availabilityStatus); + _callback(callStatus, availabilityStatus); + promise->set_value(callStatus); + }); + + return promise->get_future(); +} + +void DBusInstanceAvailabilityStatusChangedEvent::onFirstListenerAdded(const Listener&) { + + interfacesAddedSubscription_ = proxy_.addSignalMemberHandler( + proxy_.getDBusAddress().getObjectPath(), + DBusObjectManagerStub::getInterfaceName(), + "InterfacesAdded", + "oa{sa{sv}}", + this, + false); + + interfacesRemovedSubscription_ = proxy_.addSignalMemberHandler( + proxy_.getDBusAddress().getObjectPath(), + DBusObjectManagerStub::getInterfaceName(), + "InterfacesRemoved", + "oas", + this, + false); + + getAvailableServiceInstancesAsync([&](const CallStatus &_status, + const std::vector<DBusAddress> &_availableServices) { + if(_status == CallStatus::SUCCESS) { + for(auto service : _availableServices) { + if(service.getInterface() != observedDbusInterfaceName_) + continue; + if(addInterface(service.getObjectPath(), observedDbusInterfaceName_)) + notifyInterfaceStatusChanged(service.getObjectPath(), observedDbusInterfaceName_, AvailabilityStatus::AVAILABLE); + } + } + }); +} + +void DBusInstanceAvailabilityStatusChangedEvent::onLastListenerRemoved(const Listener&) { + proxy_.removeSignalMemberHandler(interfacesAddedSubscription_, this); + proxy_.removeSignalMemberHandler(interfacesRemovedSubscription_, this); +} + +void DBusInstanceAvailabilityStatusChangedEvent::onInterfacesAddedSignal(const DBusMessage &_message) { + DBusInputStream dbusInputStream(_message); + std::string dbusObjectPath; + std::string dbusInterfaceName; + DBusInterfacesAndPropertiesDict dbusInterfacesAndPropertiesDict; + + dbusInputStream >> dbusObjectPath; + if (dbusInputStream.hasError()) { + COMMONAPI_ERROR(std::string(__FUNCTION__) + " failed to read object path"); + } + + dbusInputStream.beginReadMapOfSerializableStructs(); + while (!dbusInputStream.readMapCompleted()) { + dbusInputStream.align(8); + dbusInputStream >> dbusInterfaceName; + dbusInputStream.skipMap(); + if (dbusInputStream.hasError()) { + COMMONAPI_ERROR(std::string(__FUNCTION__) + " failed to read interface name"); + } + if(dbusInterfaceName == observedDbusInterfaceName_ && addInterface(dbusObjectPath, dbusInterfaceName)) { + notifyInterfaceStatusChanged(dbusObjectPath, dbusInterfaceName, AvailabilityStatus::AVAILABLE); + } + } + dbusInputStream.endReadMapOfSerializableStructs(); +} + +void DBusInstanceAvailabilityStatusChangedEvent::onInterfacesRemovedSignal(const DBusMessage &_message) { + DBusInputStream dbusInputStream(_message); + std::string dbusObjectPath; + std::vector<std::string> dbusInterfaceNames; + + dbusInputStream >> dbusObjectPath; + if (dbusInputStream.hasError()) { + COMMONAPI_ERROR(std::string(__FUNCTION__) + " failed to read object path"); + } + + dbusInputStream >> dbusInterfaceNames; + if (dbusInputStream.hasError()) { + COMMONAPI_ERROR(std::string(__FUNCTION__) + " failed to read interface names"); + } + + for (const auto& dbusInterfaceName : dbusInterfaceNames) { + if(dbusInterfaceName == observedDbusInterfaceName_ && removeInterface(dbusObjectPath, dbusInterfaceName)) { + notifyInterfaceStatusChanged(dbusObjectPath, dbusInterfaceName, AvailabilityStatus::NOT_AVAILABLE); + } + } +} + +void DBusInstanceAvailabilityStatusChangedEvent::notifyInterfaceStatusChanged( + const std::string &_objectPath, + const std::string &_interfaceName, + const AvailabilityStatus &_availability) { + CommonAPI::Address itsAddress; + DBusAddress itsDBusAddress(proxy_.getDBusAddress().getService(), + _objectPath, + _interfaceName); + + DBusAddressTranslator::get()->translate(itsDBusAddress, itsAddress); + + // ensure, the proxy and the event survives until notification is done + auto itsProxy = proxy_.shared_from_this(); + + notifyListeners(itsAddress.getAddress(), _availability); +} + +bool DBusInstanceAvailabilityStatusChangedEvent::addInterface( + const std::string &_dbusObjectPath, + const std::string &_dbusInterfaceName) { + std::lock_guard<std::mutex> lock(interfacesMutex_); + auto it = interfaces_.find(_dbusObjectPath); + if (it == interfaces_.end()) { + std::set<std::string> itsInterfaces; + itsInterfaces.insert(_dbusInterfaceName); + interfaces_[_dbusObjectPath] = itsInterfaces; + return true; + } else { + if(it->second.insert(_dbusInterfaceName).second) + return true; + } + return false; +} + +bool DBusInstanceAvailabilityStatusChangedEvent::removeInterface( + const std::string &_dbusObjectPath, + const std::string &_dbusInterfaceName) { + std::lock_guard<std::mutex> lock(interfacesMutex_); + auto it = interfaces_.find(_dbusObjectPath); + if(it != interfaces_.end()) { + if(it->second.erase(_dbusInterfaceName) > 0) + return true; + } + return false; +} + +void DBusInstanceAvailabilityStatusChangedEvent::serviceInstancesAsyncCallback( + std::shared_ptr<Proxy> _proxy, + const DBusObjectManagerStub::DBusObjectPathAndInterfacesDict _dict, + GetAvailableServiceInstancesCallback &_call, + std::shared_ptr<std::promise<CallStatus> > &_promise) { + (void)_proxy; + std::vector<DBusAddress> result; + translate(_dict, result); + _call(CallStatus::SUCCESS, result); + _promise->set_value(CallStatus::SUCCESS); +} + +void DBusInstanceAvailabilityStatusChangedEvent::translate( + const DBusObjectManagerStub::DBusObjectPathAndInterfacesDict &_dict, + std::vector<DBusAddress> &_serviceInstances) { + DBusAddress itsDBusAddress; + + const std::string &itsService = proxy_.getDBusAddress().getService(); + itsDBusAddress.setService(itsService); + + for (const auto &objectPathIter : _dict) { + itsDBusAddress.setObjectPath(objectPathIter.first); + + const auto &interfacesDict = objectPathIter.second; + for (const auto &interfaceIter : interfacesDict) { + + if (interfaceIter.first == observedDbusInterfaceName_) { + itsDBusAddress.setInterface(interfaceIter.first); + _serviceInstances.push_back(itsDBusAddress); + } + } + } +} + +} // namespace DBus +} // namespace CommonAPI + + diff --git a/src/CommonAPI/DBus/DBusMainLoopContext.cpp b/src/CommonAPI/DBus/DBusMainLoopContext.cpp index 5c8afa5..428807f 100644 --- a/src/CommonAPI/DBus/DBusMainLoopContext.cpp +++ b/src/CommonAPI/DBus/DBusMainLoopContext.cpp @@ -41,36 +41,36 @@ bool DBusDispatchSource::dispatch() { return dbusConnection_->singleDispatch(); } -DBusMessageDispatchSource::DBusMessageDispatchSource(DBusMessageWatch* watch) : +DBusQueueDispatchSource::DBusQueueDispatchSource(DBusQueueWatch* watch) : watch_(watch) { watch_->addDependentDispatchSource(this); } -DBusMessageDispatchSource::~DBusMessageDispatchSource() { +DBusQueueDispatchSource::~DBusQueueDispatchSource() { std::unique_lock<std::mutex> itsLock(watchMutex_); watch_->removeDependentDispatchSource(this); } -bool DBusMessageDispatchSource::prepare(int64_t& timeout) { +bool DBusQueueDispatchSource::prepare(int64_t& timeout) { std::unique_lock<std::mutex> itsLock(watchMutex_); timeout = -1; - return !watch_->emptyMsgQueue(); + return !watch_->emptyQueue(); } -bool DBusMessageDispatchSource::check() { +bool DBusQueueDispatchSource::check() { std::unique_lock<std::mutex> itsLock(watchMutex_); - return !watch_->emptyMsgQueue(); + return !watch_->emptyQueue(); } -bool DBusMessageDispatchSource::dispatch() { +bool DBusQueueDispatchSource::dispatch() { std::unique_lock<std::mutex> itsLock(watchMutex_); - if (!watch_->emptyMsgQueue()) { - auto queueEntry = watch_->frontMsgQueue(); - watch_->popMsgQueue(); - watch_->processMsgQueueEntry(queueEntry); + if (!watch_->emptyQueue()) { + auto queueEntry = watch_->frontQueue(); + watch_->popQueue(); + watch_->processQueueEntry(queueEntry); } - return !watch_->emptyMsgQueue(); + return !watch_->emptyQueue(); } DBusWatch::DBusWatch(::DBusWatch* libdbusWatch, std::weak_ptr<MainLoopContext>& mainLoopContext, @@ -179,19 +179,7 @@ void DBusWatch::addDependentDispatchSource(DispatchSource* dispatchSource) { dependentDispatchSources_.push_back(dispatchSource); } -void DBusMessageWatch::MsgReplyQueueEntry::process(std::shared_ptr<DBusConnection> _connection) { - _connection->dispatchDBusMessageReply(message_, replyAsyncHandler_); -} - -void DBusMessageWatch::MsgReplyQueueEntry::clear() { - delete replyAsyncHandler_; -} - -void DBusMessageWatch::MsgQueueEntry::clear() { - -} - -DBusMessageWatch::DBusMessageWatch(std::shared_ptr<DBusConnection> _connection) : pipeValue_(4) { +DBusQueueWatch::DBusQueueWatch(std::shared_ptr<DBusConnection> _connection) : pipeValue_(4) { #ifdef WIN32 std::string pipeName = "\\\\.\\pipe\\CommonAPI-DBus-"; @@ -280,7 +268,7 @@ DBusMessageWatch::DBusMessageWatch(std::shared_ptr<DBusConnection> _connection) connection_ = _connection; } -DBusMessageWatch::~DBusMessageWatch() { +DBusQueueWatch::~DBusQueueWatch() { #ifdef WIN32 BOOL retVal = DisconnectNamedPipe((HANDLE)pipeFileDescriptors_[0]); @@ -304,36 +292,36 @@ DBusMessageWatch::~DBusMessageWatch() { close(pipeFileDescriptors_[1]); #endif - std::unique_lock<std::mutex> itsLock(msgQueueMutex_); - while(!msgQueue_.empty()) { - auto queueEntry = msgQueue_.front(); - msgQueue_.pop(); + std::unique_lock<std::mutex> itsLock(queueMutex_); + while(!queue_.empty()) { + auto queueEntry = queue_.front(); + queue_.pop(); queueEntry->clear(); } } -void DBusMessageWatch::dispatch(unsigned int) { +void DBusQueueWatch::dispatch(unsigned int) { } -const pollfd& DBusMessageWatch::getAssociatedFileDescriptor() { +const pollfd& DBusQueueWatch::getAssociatedFileDescriptor() { return pollFileDescriptor_; } #ifdef WIN32 -const HANDLE& DBusMessageWatch::getAssociatedEvent() { +const HANDLE& DBusQueueWatch::getAssociatedEvent() { return wsaEvent_; } #endif -const std::vector<DispatchSource*>& DBusMessageWatch::getDependentDispatchSources() { +const std::vector<DispatchSource*>& DBusQueueWatch::getDependentDispatchSources() { return dependentDispatchSources_; } -void DBusMessageWatch::addDependentDispatchSource(CommonAPI::DispatchSource* _dispatchSource) { +void DBusQueueWatch::addDependentDispatchSource(CommonAPI::DispatchSource* _dispatchSource) { dependentDispatchSources_.push_back(_dispatchSource); } -void DBusMessageWatch::removeDependentDispatchSource(CommonAPI::DispatchSource* _dispatchSource) { +void DBusQueueWatch::removeDependentDispatchSource(CommonAPI::DispatchSource* _dispatchSource) { std::vector<CommonAPI::DispatchSource*>::iterator it; for (it = dependentDispatchSources_.begin(); it != dependentDispatchSources_.end(); it++) { @@ -344,9 +332,9 @@ void DBusMessageWatch::removeDependentDispatchSource(CommonAPI::DispatchSource* } } -void DBusMessageWatch::pushMsgQueue(std::shared_ptr<MsgQueueEntry> _queueEntry) { - std::unique_lock<std::mutex> itsLock(msgQueueMutex_); - msgQueue_.push(_queueEntry); +void DBusQueueWatch::pushQueue(std::shared_ptr<QueueEntry> _queueEntry) { + std::unique_lock<std::mutex> itsLock(queueMutex_); + queue_.push(_queueEntry); #ifdef WIN32 char writeValue[sizeof(pipeValue_)]; @@ -371,8 +359,8 @@ void DBusMessageWatch::pushMsgQueue(std::shared_ptr<MsgQueueEntry> _queueEntry) #endif } -void DBusMessageWatch::popMsgQueue() { - std::unique_lock<std::mutex> itsLock(msgQueueMutex_); +void DBusQueueWatch::popQueue() { + std::unique_lock<std::mutex> itsLock(queueMutex_); #ifdef WIN32 char readValue[sizeof(pipeValue_)]; @@ -396,32 +384,42 @@ void DBusMessageWatch::popMsgQueue() { } #endif - msgQueue_.pop(); + queue_.pop(); } -std::shared_ptr<DBusMessageWatch::MsgQueueEntry> DBusMessageWatch::frontMsgQueue() { - std::unique_lock<std::mutex> itsLock(msgQueueMutex_); +std::shared_ptr<QueueEntry> DBusQueueWatch::frontQueue() { + std::unique_lock<std::mutex> itsLock(queueMutex_); - return msgQueue_.front(); + return queue_.front(); } -bool DBusMessageWatch::emptyMsgQueue() { - std::unique_lock<std::mutex> itsLock(msgQueueMutex_); +bool DBusQueueWatch::emptyQueue() { + std::unique_lock<std::mutex> itsLock(queueMutex_); - return msgQueue_.empty(); + return queue_.empty(); } -void DBusMessageWatch::processMsgQueueEntry(std::shared_ptr<DBusMessageWatch::MsgQueueEntry> _queueEntry) { +void DBusQueueWatch::processQueueEntry(std::shared_ptr<QueueEntry> _queueEntry) { std::shared_ptr<DBusConnection> itsConnection = connection_.lock(); if(itsConnection) { _queueEntry->process(itsConnection); } } -DBusTimeout::DBusTimeout(::DBusTimeout* libdbusTimeout, std::weak_ptr<MainLoopContext>& mainLoopContext) : +#ifdef WIN32 +__declspec(thread) DBusTimeout* DBusTimeout::currentTimeout_ = NULL; +#else +thread_local DBusTimeout* DBusTimeout::currentTimeout_ = NULL; +#endif + +DBusTimeout::DBusTimeout(::DBusTimeout* libdbusTimeout, std::weak_ptr<MainLoopContext>& mainLoopContext, + std::weak_ptr<DBusConnection>& dbusConnection) : dueTimeInMs_(TIMEOUT_INFINITE), libdbusTimeout_(libdbusTimeout), - mainLoopContext_(mainLoopContext) { + mainLoopContext_(mainLoopContext), + dbusConnection_(dbusConnection), + pendingCall_(NULL) { + currentTimeout_ = this; } bool DBusTimeout::isReadyToBeMonitored() { @@ -447,9 +445,16 @@ void DBusTimeout::stopMonitoring() { } bool DBusTimeout::dispatch() { - recalculateDueTime(); - dbus_timeout_handle(libdbusTimeout_); - return true; + std::shared_ptr<DBusConnection> itsConnection = dbusConnection_.lock(); + if(itsConnection) { + if(itsConnection->setDispatching(true)) { + recalculateDueTime(); + itsConnection->setPendingCallTimedOut(pendingCall_, libdbusTimeout_); + itsConnection->setDispatching(false); + return true; + } + } + return false; } int64_t DBusTimeout::getTimeoutInterval() const { @@ -469,5 +474,9 @@ void DBusTimeout::recalculateDueTime() { } } +void DBusTimeout::setPendingCall(DBusPendingCall* _pendingCall) { + pendingCall_ = _pendingCall; +} + } // namespace DBus } // namespace CommonAPI diff --git a/src/CommonAPI/DBus/DBusOutputStream.cpp b/src/CommonAPI/DBus/DBusOutputStream.cpp index 6fd5d0a..564aa61 100644 --- a/src/CommonAPI/DBus/DBusOutputStream.cpp +++ b/src/CommonAPI/DBus/DBusOutputStream.cpp @@ -73,7 +73,7 @@ DBusOutputStream& DBusOutputStream::writeString(const char *_value, const uint32 if (NULL == _value) { COMMONAPI_ERROR(std::string(__FUNCTION__) + " _value == NULL"); } else if (_value[_length] != '\0') { - COMMONAPI_ERROR(std::string(__FUNCTION__) + " _value is not zero-terminated") + COMMONAPI_ERROR(std::string(__FUNCTION__) + " _value is not zero-terminated"); } else { _writeValue(_length); _writeRaw(_value, _length + 1); @@ -84,7 +84,12 @@ DBusOutputStream& DBusOutputStream::writeString(const char *_value, const uint32 DBusOutputStream& DBusOutputStream::writeByteBuffer(const uint8_t *_value, const uint32_t &_length) { if (NULL == _value) { - COMMONAPI_ERROR(std::string(__FUNCTION__) + " _value == NULL"); + if (0 != _length) { + COMMONAPI_ERROR(std::string(__FUNCTION__) + " _value == NULL && _length != 0"); + } else { + COMMONAPI_WARNING(std::string(__FUNCTION__) + " _value == NULL"); + _writeValue(_length); + } } else { _writeValue(_length); _writeRaw(reinterpret_cast<const char*>(_value), _length); diff --git a/src/CommonAPI/DBus/DBusProxy.cpp b/src/CommonAPI/DBus/DBusProxy.cpp index 95c3d59..655fa6c 100644 --- a/src/CommonAPI/DBus/DBusProxy.cpp +++ b/src/CommonAPI/DBus/DBusProxy.cpp @@ -9,6 +9,7 @@ #include <CommonAPI/DBus/DBusProxy.hpp> #include <CommonAPI/DBus/DBusUtils.hpp> #include <CommonAPI/DBus/DBusProxyAsyncSignalMemberCallbackHandler.hpp> +#include <CommonAPI/DBus/DBusConnection.hpp> #include <CommonAPI/Logger.hpp> namespace CommonAPI { @@ -18,10 +19,36 @@ DBusProxyStatusEvent::DBusProxyStatusEvent(DBusProxy *_dbusProxy) : dbusProxy_(_dbusProxy) { } -void DBusProxyStatusEvent::onListenerAdded(const Listener &_listener, const Subscription _subscription) { - (void)_subscription; - if (dbusProxy_->isAvailable()) - _listener(AvailabilityStatus::AVAILABLE); +void DBusProxyStatusEvent::onListenerAdded(const Listener &_listener, + const Subscription _subscription) { + std::lock_guard<std::recursive_mutex> listenersLock(listenersMutex_); + + //notify listener about availability status -> push function to mainloop + std::weak_ptr<DBusProxy> itsdbusProxy = dbusProxy_->shared_from_this(); + std::function<void(std::weak_ptr<DBusProxy>, Listener, Subscription)> notifySpecificListenerHandler = + std::bind(&DBusProxy::notifySpecificListener, + dbusProxy_, + std::placeholders::_1, + std::placeholders::_2, + std::placeholders::_3); + dbusProxy_->getDBusConnection()->proxyPushFunctionToMainLoop<DBusConnection>( + notifySpecificListenerHandler, + itsdbusProxy, + _listener, + _subscription); +} + +void DBusProxyStatusEvent::onListenerRemoved(const Listener& _listener, + const Subscription _subscription) { + std::lock_guard<std::recursive_mutex> listenersLock(listenersMutex_); + (void)_listener; + auto listenerIt = listeners_.begin(); + while(listenerIt != listeners_.end()) { + if(listenerIt->first == _subscription) + listenerIt = listeners_.erase(listenerIt); + else + ++listenerIt; + } } void DBusProxy::availabilityTimeoutThreadHandler() const { @@ -36,7 +63,7 @@ void DBusProxy::availabilityTimeoutThreadHandler() const { isAvailableAsyncCallback, std::promise<AvailabilityStatus>, AvailabilityStatus, - std::chrono::time_point<std::chrono::high_resolution_clock> + std::chrono::steady_clock::time_point > CallbackData_t; std::list<CallbackData_t> callbacks; @@ -46,14 +73,14 @@ void DBusProxy::availabilityTimeoutThreadHandler() const { timeoutsMutex_.lock(); int timeout = std::numeric_limits<int>::max(); - std::chrono::time_point<std::chrono::high_resolution_clock> minTimeout; + std::chrono::steady_clock::time_point minTimeout; if (timeouts_.size() > 0) { auto minTimeoutElement = std::min_element(timeouts_.begin(), timeouts_.end(), [] (const AvailabilityTimeout_t& lhs, const AvailabilityTimeout_t& rhs) { return std::get<0>(lhs) < std::get<0>(rhs); }); minTimeout = std::get<0>(*minTimeoutElement); - std::chrono::high_resolution_clock::time_point now = std::chrono::high_resolution_clock::now(); + std::chrono::steady_clock::time_point now = (std::chrono::steady_clock::time_point) std::chrono::steady_clock::now(); timeout = (int)std::chrono::duration_cast<std::chrono::milliseconds>(minTimeout - now).count(); } timeoutsMutex_.unlock(); @@ -66,21 +93,22 @@ void DBusProxy::availabilityTimeoutThreadHandler() const { //iterate through timeouts auto it = timeouts_.begin(); while (it != timeouts_.end()) { - std::chrono::high_resolution_clock::time_point now = std::chrono::high_resolution_clock::now(); + std::chrono::steady_clock::time_point now = (std::chrono::steady_clock::time_point) std::chrono::steady_clock::now(); isAvailableAsyncCallback callback = std::get<1>(*it); if (now > std::get<0>(*it)) { //timeout availabilityMutex_.lock(); + std::chrono::steady_clock::time_point timepoint_; if(isAvailable()) callbacks.push_back(std::make_tuple(callback, std::move(std::get<2>(*it)), AvailabilityStatus::AVAILABLE, - std::chrono::time_point<std::chrono::high_resolution_clock>())); + timepoint_)); else callbacks.push_back(std::make_tuple(callback, std::move(std::get<2>(*it)), AvailabilityStatus::NOT_AVAILABLE, - std::chrono::time_point<std::chrono::high_resolution_clock>())); + timepoint_)); it = timeouts_.erase(it); availabilityMutex_.unlock(); } else { @@ -131,7 +159,7 @@ void DBusProxy::availabilityTimeoutThreadHandler() const { isAvailableAsyncCallback callback; AvailabilityStatus avStatus; int remainingTimeout; - std::chrono::high_resolution_clock::time_point now; + std::chrono::steady_clock::time_point now; auto it = callbacks.begin(); while(it != callbacks.end()) { @@ -139,7 +167,7 @@ void DBusProxy::availabilityTimeoutThreadHandler() const { avStatus = std::get<2>(*it); // compute remaining timeout - now = std::chrono::high_resolution_clock::now(); + now = (std::chrono::steady_clock::time_point) std::chrono::steady_clock::now(); remainingTimeout = (int)std::chrono::duration_cast<std::chrono::milliseconds>(std::get<3>(*it) - now).count(); if(remainingTimeout < 0) remainingTimeout = 0; @@ -170,10 +198,10 @@ DBusProxy::DBusProxy(const DBusAddress &_dbusAddress, interfaceVersionAttribute_(*this, "uu", "getInterfaceVersion"), dbusServiceRegistry_(DBusServiceRegistry::get(_connection)) { - Factory::get()->incrementConnection(connection_); } void DBusProxy::init() { + selfReference_ = shared_from_this(); dbusServiceRegistrySubscription_ = dbusServiceRegistry_->subscribeAvailabilityListener( getAddress().getAddress(), std::bind(&DBusProxy::onDBusServiceInstanceStatus, this, std::placeholders::_1)); @@ -216,7 +244,7 @@ std::future<AvailabilityStatus> DBusProxy::isAvailableAsync( std::future<AvailabilityStatus> future = promise.get_future(); //set timeout point - auto timeoutPoint = std::chrono::high_resolution_clock::now() + std::chrono::milliseconds(_info->timeout_); + auto timeoutPoint = (std::chrono::steady_clock::time_point) std::chrono::steady_clock::now() + std::chrono::milliseconds(_info->timeout_); timeoutsMutex_.lock(); if(timeouts_.size() == 0) { @@ -258,6 +286,10 @@ std::future<AvailabilityStatus> DBusProxy::isAvailableAsync( return future; } +AvailabilityStatus DBusProxy::getAvailabilityStatus() const { + return availabilityStatus_; +} + ProxyStatusEvent& DBusProxy::getProxyStatusEvent() { return dbusProxyStatusEvent_; } @@ -286,7 +318,24 @@ void DBusProxy::signalInitialValueCallback(const CallStatus _status, } } +void DBusProxy::notifySpecificListener(std::weak_ptr<DBusProxy> _dbusProxy, + const ProxyStatusEvent::Listener &_listener, + const ProxyStatusEvent::Subscription _subscription) { + if(_dbusProxy.lock()) { + std::lock_guard<std::recursive_mutex> listenersLock(dbusProxyStatusEvent_.listenersMutex_); + + AvailabilityStatus itsStatus = availabilityStatus_; + if (itsStatus != AvailabilityStatus::UNKNOWN) + dbusProxyStatusEvent_.notifySpecificListener(_subscription, itsStatus); + + //add listener to list so that it can be notified about a change of availability + dbusProxyStatusEvent_.listeners_.push_back(std::make_pair(_subscription, _listener)); + } +} + void DBusProxy::onDBusServiceInstanceStatus(const AvailabilityStatus& availabilityStatus) { + //ensure, proxy survives until notification is done + auto itsSelf = selfReference_.lock(); if (availabilityStatus != availabilityStatus_) { availabilityMutex_.lock(); availabilityStatus_ = availabilityStatus; @@ -297,7 +346,11 @@ void DBusProxy::onDBusServiceInstanceStatus(const AvailabilityStatus& availabili availabilityTimeoutCondition_.notify_all(); availabilityTimeoutThreadMutex_.unlock(); - dbusProxyStatusEvent_.notifyListeners(availabilityStatus); + { + std::lock_guard<std::recursive_mutex> subscribersLock(dbusProxyStatusEvent_.listenersMutex_); + for(auto listenerIt : dbusProxyStatusEvent_.listeners_) + dbusProxyStatusEvent_.notifySpecificListener(listenerIt.first, availabilityStatus_); + } if (availabilityStatus == AvailabilityStatus::AVAILABLE) { std::lock_guard < std::mutex > queueLock(signalMemberHandlerQueueMutex_); @@ -335,9 +388,11 @@ void DBusProxy::onDBusServiceInstanceStatus(const AvailabilityStatus& availabili { std::lock_guard < std::mutex > queueLock(selectiveBroadcastHandlersMutex_); for (auto selectiveBroadcasts : selectiveBroadcastHandlers) { - std::string methodName = "subscribeFor" + selectiveBroadcasts.first + "Selective"; - connection_->sendPendingSelectiveSubscription(this, methodName, selectiveBroadcasts.second.first, - selectiveBroadcasts.second.second); + connection_->sendPendingSelectiveSubscription(this, + selectiveBroadcasts.first, + std::get<0>(selectiveBroadcasts.second), + std::get<1>(selectiveBroadcasts.second), + std::get<2>(selectiveBroadcasts.second)); } } } else { @@ -365,9 +420,11 @@ void DBusProxy::onDBusServiceInstanceStatus(const AvailabilityStatus& availabili } void DBusProxy::insertSelectiveSubscription(const std::string& interfaceMemberName, - DBusProxyConnection::DBusSignalHandler* dbusSignalHandler, uint32_t tag) { + DBusProxyConnection::DBusSignalHandler* dbusSignalHandler, uint32_t tag, + std::string interfaceMemberSignature) { std::lock_guard < std::mutex > queueLock(selectiveBroadcastHandlersMutex_); - selectiveBroadcastHandlers[interfaceMemberName] = std::make_pair(dbusSignalHandler, tag); + selectiveBroadcastHandlers[interfaceMemberName] = std::make_tuple( + dbusSignalHandler, tag, interfaceMemberSignature); } void DBusProxy::subscribeForSelectiveBroadcastOnConnection( @@ -585,6 +642,5 @@ void DBusProxy::freeDesktopGetCurrentValueForSignalListener( } } - } // namespace DBus } // namespace CommonAPI diff --git a/src/CommonAPI/DBus/DBusProxyManager.cpp b/src/CommonAPI/DBus/DBusProxyManager.cpp index e36f6e1..cedd58b 100644 --- a/src/CommonAPI/DBus/DBusProxyManager.cpp +++ b/src/CommonAPI/DBus/DBusProxyManager.cpp @@ -12,11 +12,12 @@ namespace DBus { DBusProxyManager::DBusProxyManager( DBusProxy &_proxy, - const std::string &_interfaceId) + const std::string &_dbusInterfaceId, + const std::string &_capiInterfaceId) : proxy_(_proxy), - instanceAvailabilityStatusEvent_(_proxy, _interfaceId), - interfaceId_(_interfaceId), - registry_(DBusServiceRegistry::get(_proxy.getDBusConnection())) + instanceAvailabilityStatusEvent_(_proxy, _dbusInterfaceId, _capiInterfaceId), + dbusInterfaceId_(_dbusInterfaceId), + capiInterfaceId_(_capiInterfaceId) { } @@ -28,7 +29,7 @@ DBusProxyManager::getDomain() const { const std::string & DBusProxyManager::getInterface() const { - return interfaceId_; + return capiInterfaceId_; } const ConnectionId_t & @@ -40,14 +41,16 @@ DBusProxyManager::getConnectionId() const { void DBusProxyManager::instancesAsyncCallback( + std::shared_ptr<Proxy> _proxy, const CommonAPI::CallStatus &_status, - const DBusObjectManagerStub::DBusObjectPathAndInterfacesDict &_dict, + const std::vector<DBusAddress> &_availableServiceInstances, GetAvailableInstancesCallback &_call) { - std::vector<std::string> result; + (void)_proxy; + std::vector<std::string> itsAvailableInstances; if (_status == CommonAPI::CallStatus::SUCCESS) { - translateCommonApiAddresses(_dict, result); + translate(_availableServiceInstances, itsAvailableInstances); } - _call(_status, result); + _call(_status, itsAvailableInstances); } void @@ -55,98 +58,38 @@ DBusProxyManager::getAvailableInstances( CommonAPI::CallStatus &_status, std::vector<std::string> &_availableInstances) { _availableInstances.clear(); - DBusObjectManagerStub::DBusObjectPathAndInterfacesDict dict; - - DBusProxyHelper< - DBusSerializableArguments<>, - DBusSerializableArguments< - DBusObjectManagerStub::DBusObjectPathAndInterfacesDict - > - >::callMethodWithReply(proxy_, - DBusObjectManagerStub::getInterfaceName(), - "GetManagedObjects", - "", - &defaultCallInfo, - _status, - dict); - - if (_status == CallStatus::SUCCESS) { - translateCommonApiAddresses(dict, _availableInstances); - } + std::vector<DBusAddress> itsAvailableServiceInstances; + instanceAvailabilityStatusEvent_.getAvailableServiceInstances(_status, itsAvailableServiceInstances); + translate(itsAvailableServiceInstances, _availableInstances); } std::future<CallStatus> DBusProxyManager::getAvailableInstancesAsync( GetAvailableInstancesCallback _callback) { - return CommonAPI::DBus::DBusProxyHelper< - CommonAPI::DBus::DBusSerializableArguments<>, - CommonAPI::DBus::DBusSerializableArguments< - DBusObjectManagerStub::DBusObjectPathAndInterfacesDict - > - >::callMethodAsync( - proxy_, - DBusObjectManagerStub::getInterfaceName(), - "GetManagedObjects", - "", - &defaultCallInfo, - std::move( - std::bind( - &DBusProxyManager::instancesAsyncCallback, - this, - std::placeholders::_1, std::placeholders::_2, - _callback - ) - ), - std::tuple<DBusObjectManagerStub::DBusObjectPathAndInterfacesDict>()); + return instanceAvailabilityStatusEvent_.getAvailableServiceInstancesAsync(std::bind( + &DBusProxyManager::instancesAsyncCallback, + this, + proxy_.shared_from_this(), + std::placeholders::_1, + std::placeholders::_2, + _callback)); } void DBusProxyManager::getInstanceAvailabilityStatus( - const std::string &_address, + const std::string &_instance, CallStatus &_callStatus, AvailabilityStatus &_availabilityStatus) { - - CommonAPI::Address itsAddress("local", interfaceId_, _address); - DBusAddress itsDBusAddress; - DBusAddressTranslator::get()->translate(itsAddress, itsDBusAddress); - - _availabilityStatus = AvailabilityStatus::NOT_AVAILABLE; - if (registry_->isServiceInstanceAlive( - itsDBusAddress.getInterface(), - itsDBusAddress.getService(), - itsDBusAddress.getObjectPath())) { - _availabilityStatus = AvailabilityStatus::AVAILABLE; - } - _callStatus = CallStatus::SUCCESS; -} - -void -DBusProxyManager::instanceAliveAsyncCallback( - const AvailabilityStatus &_alive, - GetInstanceAvailabilityStatusCallback &_call, - std::shared_ptr<std::promise<CallStatus> > &_status) { - _call(CallStatus::SUCCESS, _alive); - _status->set_value(CallStatus::SUCCESS); + instanceAvailabilityStatusEvent_.getServiceInstanceAvailabilityStatus(_instance, + _callStatus, + _availabilityStatus); } std::future<CallStatus> DBusProxyManager::getInstanceAvailabilityStatusAsync( const std::string &_instance, GetInstanceAvailabilityStatusCallback _callback) { - - CommonAPI::Address itsAddress("local", interfaceId_, _instance); - - std::shared_ptr<std::promise<CallStatus> > promise = std::make_shared<std::promise<CallStatus>>(); - registry_->subscribeAvailabilityListener( - itsAddress.getAddress(), - std::bind(&DBusProxyManager::instanceAliveAsyncCallback, - this, - std::placeholders::_1, - _callback, - promise) - ); - - return promise->get_future(); + return instanceAvailabilityStatusEvent_.getServiceInstanceAvailabilityStatusAsync(_instance, _callback); } DBusProxyManager::InstanceAvailabilityStatusChangedEvent & @@ -154,31 +97,12 @@ DBusProxyManager::getInstanceAvailabilityStatusChangedEvent() { return instanceAvailabilityStatusEvent_; } -void -DBusProxyManager::translateCommonApiAddresses( - const DBusObjectManagerStub::DBusObjectPathAndInterfacesDict &_dict, - std::vector<std::string> &_instances) { - - CommonAPI::Address itsAddress; - DBusAddress itsDBusAddress; - - // get service information from proxy - const std::string &_service = proxy_.getDBusAddress().getService(); - itsDBusAddress.setService(_service); - - for (const auto &objectPathIter : _dict) { - itsDBusAddress.setObjectPath(objectPathIter.first); - - const auto &interfacesDict = objectPathIter.second; - for (const auto &interfaceIter : interfacesDict) { - - // return only those addresses whose interface matches with ours - if (interfaceIter.first == interfaceId_) { - itsDBusAddress.setInterface(interfaceIter.first); - DBusAddressTranslator::get()->translate(itsDBusAddress, itsAddress); - _instances.push_back(itsAddress.getInstance()); - } - } +void DBusProxyManager::translate(const std::vector<DBusAddress> &_serviceInstances, + std::vector<std::string> &_instances) { + CommonAPI::Address itsCapiAddress; + for(auto itsDbusAddress : _serviceInstances) { + DBusAddressTranslator::get()->translate(itsDbusAddress, itsCapiAddress); + _instances.push_back(itsCapiAddress.getInstance()); } } diff --git a/src/CommonAPI/DBus/DBusServiceRegistry.cpp b/src/CommonAPI/DBus/DBusServiceRegistry.cpp index 65290fc..d345a9d 100644 --- a/src/CommonAPI/DBus/DBusServiceRegistry.cpp +++ b/src/CommonAPI/DBus/DBusServiceRegistry.cpp @@ -46,8 +46,6 @@ DBusServiceRegistry::remove(std::shared_ptr<DBusProxyConnection> _connection) { DBusServiceRegistry::DBusServiceRegistry(std::shared_ptr<DBusProxyConnection> dbusProxyConnection) : dbusDaemonProxy_(std::make_shared<CommonAPI::DBus::DBusDaemonProxy>(dbusProxyConnection)), initialized_(false), - servicesToResolve(0), - objectPathsToResolve(0), notificationThread_() { } @@ -57,7 +55,6 @@ DBusServiceRegistry::~DBusServiceRegistry() { } dbusDaemonProxy_->getNameOwnerChangedEvent().unsubscribe(dbusDaemonProxyNameOwnerChangedEventSubscription_); - dbusDaemonProxy_->getProxyStatusEvent().unsubscribe(dbusDaemonProxyStatusEventSubscription_); // notify only listeners of resolved services (online > offline) for (auto& dbusServiceListenersIterator : dbusServiceListenersMap) { @@ -85,10 +82,6 @@ DBusServiceRegistry::~DBusServiceRegistry() { void DBusServiceRegistry::init() { translator_ = DBusAddressTranslator::get(); - dbusDaemonProxyStatusEventSubscription_ = - dbusDaemonProxy_->getProxyStatusEvent().subscribe( - std::bind(&DBusServiceRegistry::onDBusDaemonProxyStatusEvent, this, std::placeholders::_1)); - dbusDaemonProxyNameOwnerChangedEventSubscription_ = dbusDaemonProxy_->getNameOwnerChangedEvent().subscribe( std::bind(&DBusServiceRegistry::onDBusDaemonProxyNameOwnerChangedEvent, @@ -173,6 +166,8 @@ DBusServiceRegistry::subscribeAvailabilityListener( } dbusInterfaceNameListenersRecord.listenerList.push_front(std::move(serviceListener)); + dbusInterfaceNameListenersRecord.listenersToRemove.remove( + dbusInterfaceNameListenersRecord.listenerList.begin()); dbusServicesMutex_.unlock(); @@ -217,15 +212,8 @@ DBusServiceRegistry::unsubscribeAvailabilityListener( auto& dbusInterfaceNameListenersRecord = dbusInterfaceNameListenersIterator->second; - dbusInterfaceNameListenersRecord.listenerList.erase(listenerSubscription); - - if (dbusInterfaceNameListenersRecord.listenerList.empty()) { - dbusInterfaceNameListenersMap.erase(dbusInterfaceNameListenersIterator); - - if (dbusInterfaceNameListenersMap.empty()) { - dbusServiceListenersRecord.dbusObjectPathListenersMap.erase(dbusObjectPathListenersIterator); - } - } + // mark listener to remove + dbusInterfaceNameListenersRecord.listenersToRemove.push_back(listenerSubscription); dbusServicesMutex_.unlock(); } @@ -234,6 +222,7 @@ DBusServiceRegistry::unsubscribeAvailabilityListener( bool DBusServiceRegistry::isServiceInstanceAlive(const std::string& dbusInterfaceName, const std::string& dbusServiceName, const std::string& dbusObjectPath) { + bool result = false; std::chrono::milliseconds timeout(1000); bool uniqueNameFound = false; @@ -267,9 +256,8 @@ bool DBusServiceRegistry::isServiceInstanceAlive(const std::string& dbusInterfac std::shared_future<DBusRecordState> futureNameResolved = insertedDbusServiceListenerRecord.first->second.futureOnResolve; futureNameResolved.wait_for(timeout); - if(futureNameResolved.get() != DBusRecordState::RESOLVED) { + if(futureNameResolved.get() != DBusRecordState::RESOLVED) return false; - } dbusServicesMutex_.lock(); auto dbusServiceListenersMapIterator = dbusServiceListenersMap.find(dbusServiceName); @@ -296,68 +284,73 @@ bool DBusServiceRegistry::isServiceInstanceAlive(const std::string& dbusInterfac dbusUniqueNameRecord = &dbusUniqueNameRecordIterator->second; } - dbusServicesMutex_.unlock(); - if (NULL == dbusUniqueNameRecord) { COMMONAPI_ERROR(std::string(__FUNCTION__), " no unique name record found for IF: ", dbusInterfaceName, " service: ", dbusServiceName, "object path: ", dbusObjectPath); } - auto& dbusObjectPathsCache = dbusUniqueNameRecord->dbusObjectPathsCache; - auto dbusObjectPathCacheIterator = dbusObjectPathsCache.find(dbusObjectPath); + if(dbusPredefinedServices_.find(dbusServiceName) == dbusPredefinedServices_.end()) { - DBusObjectPathCache* dbusObjectPathCache = NULL; + auto& dbusObjectPathsCache = dbusUniqueNameRecord->dbusObjectPathsCache; + auto dbusObjectPathCacheIterator = dbusObjectPathsCache.find(dbusObjectPath); - if(dbusObjectPathCacheIterator != dbusObjectPathsCache.end()) { - dbusObjectPathCache = &(dbusObjectPathCacheIterator->second); - if (dbusObjectPathCache->state != DBusRecordState::RESOLVED) { - dbusObjectPathCache->state = DBusRecordState::RESOLVING; - dbusServicesMutex_.lock(); + DBusObjectPathCache* dbusObjectPathCache = NULL; + if(dbusObjectPathCacheIterator != dbusObjectPathsCache.end()) { dbusObjectPathCache = &(dbusObjectPathCacheIterator->second); + if (dbusObjectPathCache->state != DBusRecordState::RESOLVED) { + dbusObjectPathCache->state = DBusRecordState::RESOLVING; - std::future<DBusRecordState> futureObjectPathResolved = dbusObjectPathCache->promiseOnResolve.get_future(); - dbusServicesMutex_.unlock(); + dbusObjectPathCache = &(dbusObjectPathCacheIterator->second); - introspectDBusObjectPath(uniqueName, dbusObjectPath); - futureObjectPathResolved.wait_for(timeout); + std::future<DBusRecordState> futureObjectPathResolved = dbusObjectPathCache->promiseOnResolve.get_future(); + dbusServicesMutex_.unlock(); + + resolveObjectPathWithObjectManager(uniqueName, dbusObjectPath); + futureObjectPathResolved.wait_for(timeout); + } else { + dbusServicesMutex_.unlock(); + } } - } - else { - // try to resolve object paths - DBusObjectPathCache newDbusObjectPathCache; - newDbusObjectPathCache.state = DBusRecordState::RESOLVING; - newDbusObjectPathCache.serviceName = dbusServiceName; + else { + // try to resolve object paths + DBusObjectPathCache newDbusObjectPathCache; + newDbusObjectPathCache.state = DBusRecordState::RESOLVING; + newDbusObjectPathCache.serviceName = dbusServiceName; - dbusServicesMutex_.lock(); + dbusObjectPathsCache.insert(std::make_pair(dbusObjectPath, std::move(newDbusObjectPathCache))); - dbusObjectPathsCache.insert(std::make_pair(dbusObjectPath, std::move(newDbusObjectPathCache))); + dbusObjectPathCacheIterator = dbusObjectPathsCache.find(dbusObjectPath); - dbusObjectPathCacheIterator = dbusObjectPathsCache.find(dbusObjectPath); + dbusObjectPathCache = &(dbusObjectPathCacheIterator->second); - dbusObjectPathCache = &(dbusObjectPathCacheIterator->second); + newDbusObjectPathCache.futureOnResolve = dbusObjectPathCache->promiseOnResolve.get_future(); + dbusServicesMutex_.unlock(); - newDbusObjectPathCache.futureOnResolve = dbusObjectPathCache->promiseOnResolve.get_future(); - dbusServicesMutex_.unlock(); + resolveObjectPathWithObjectManager(uniqueName, dbusObjectPath); + newDbusObjectPathCache.futureOnResolve.wait_for(timeout); + } - introspectDBusObjectPath(uniqueName, dbusObjectPath); - newDbusObjectPathCache.futureOnResolve.wait_for(timeout); - } + if (NULL == dbusObjectPathCache) { + COMMONAPI_ERROR(std::string(__FUNCTION__), " no object path cache entry found for IF: ", dbusInterfaceName, + " service: ", dbusServiceName, "object path: ", dbusObjectPath); + } - if (NULL == dbusObjectPathCache) { - COMMONAPI_ERROR(std::string(__FUNCTION__), " no object path cache entry found for IF: ", dbusInterfaceName, - " service: ", dbusServiceName, "object path: ", dbusObjectPath); - } + dbusServicesMutex_.lock(); + if(dbusObjectPathCache->state != DBusRecordState::RESOLVED) { + dbusServicesMutex_.unlock(); + return false; + } - dbusServicesMutex_.lock(); - if(dbusObjectPathCache->state != DBusRecordState::RESOLVED) { + auto dbusInterfaceNamesIterator = dbusObjectPathCache->dbusInterfaceNamesCache.find(dbusInterfaceName); + result = dbusInterfaceNamesIterator != dbusObjectPathCache->dbusInterfaceNamesCache.end(); dbusServicesMutex_.unlock(); - return false; - } - auto dbusInterfaceNamesIterator = dbusObjectPathCache->dbusInterfaceNamesCache.find(dbusInterfaceName); - bool result = dbusInterfaceNamesIterator != dbusObjectPathCache->dbusInterfaceNamesCache.end(); - dbusServicesMutex_.unlock(); + } else { + //service is predefined + result = true; + dbusServicesMutex_.unlock(); + } return(result); } @@ -372,6 +365,7 @@ void DBusServiceRegistry::fetchAllServiceNames() { dbusDaemonProxy_->listNames(callStatus, availableServiceNames); + dbusServicesMutex_.lock(); if (callStatus == CallStatus::SUCCESS) { for(std::string serviceName : availableServiceNames) { if(isDBusServiceName(serviceName)) { @@ -379,114 +373,27 @@ void DBusServiceRegistry::fetchAllServiceNames() { } } } + dbusServicesMutex_.unlock(); } -// d-feet mode -std::vector<std::string> DBusServiceRegistry::getAvailableServiceInstances(const std::string& interfaceName, - const std::string& domainName) { - (void)domainName; - std::vector<std::string> availableServiceInstances; - - // resolve all service names - for (auto serviceNameIterator = dbusServiceNameMap_.begin(); - serviceNameIterator != dbusServiceNameMap_.end(); - serviceNameIterator++) { - - std::string serviceName = serviceNameIterator->first; - DBusUniqueNameRecord* dbusUniqueNameRecord = serviceNameIterator->second; - - if(dbusUniqueNameRecord == NULL) { - DBusServiceListenersRecord& serviceListenerRecord = dbusServiceListenersMap[serviceName]; - if(serviceListenerRecord.uniqueBusNameState != DBusRecordState::RESOLVING) { - resolveDBusServiceName(serviceName, serviceListenerRecord); - } - } - } - - std::mutex mutexResolveAllServices; - std::unique_lock<std::mutex> lockResolveAllServices(mutexResolveAllServices); - std::chrono::milliseconds timeout(5000); - - monitorResolveAllServices_.wait_for(lockResolveAllServices, timeout, [&] { - mutexServiceResolveCount.lock(); - bool finished = servicesToResolve == 0; - mutexServiceResolveCount.unlock(); - - return finished; - }); - - for (auto serviceNameIterator = dbusServiceNameMap_.begin(); - serviceNameIterator != dbusServiceNameMap_.end(); - serviceNameIterator++) { - - std::string serviceName = serviceNameIterator->first; - DBusUniqueNameRecord* dbusUniqueNameRecord = serviceNameIterator->second; - - if(dbusUniqueNameRecord != NULL) { - if(dbusUniqueNameRecord->objectPathsState == DBusRecordState::UNKNOWN) { - DBusObjectPathCache& rootObjectPathCache = dbusUniqueNameRecord->dbusObjectPathsCache["/"]; - if(rootObjectPathCache.state == DBusRecordState::UNKNOWN) { - rootObjectPathCache.state = DBusRecordState::RESOLVING; - introspectDBusObjectPath(dbusUniqueNameRecord->uniqueName, "/"); - } - } - } - } - - std::mutex mutexResolveAllObjectPaths; - std::unique_lock<std::mutex> lockResolveAllObjectPaths(mutexResolveAllObjectPaths); - - // TODO: Check if should use the remaining timeout not "used" during wait before - monitorResolveAllObjectPaths_.wait_for(lockResolveAllObjectPaths, timeout, [&] { - mutexServiceResolveCount.lock(); - bool finished = objectPathsToResolve == 0; - mutexServiceResolveCount.unlock(); - - return finished; - }); - - for (auto serviceNameIterator = dbusServiceNameMap_.begin(); - serviceNameIterator != dbusServiceNameMap_.end(); - serviceNameIterator++) { - - std::string serviceName = serviceNameIterator->first; - DBusUniqueNameRecord* dbusUniqueNameRecord = serviceNameIterator->second; - - if(dbusUniqueNameRecord != NULL) { - if(dbusUniqueNameRecord->objectPathsState == DBusRecordState::RESOLVED) { - for (auto dbusObjectPathCacheIterator = dbusUniqueNameRecord->dbusObjectPathsCache.begin(); - dbusObjectPathCacheIterator != dbusUniqueNameRecord->dbusObjectPathsCache.end(); - dbusObjectPathCacheIterator++) { - if (dbusObjectPathCacheIterator->second.state == DBusRecordState::RESOLVED) { - if (dbusObjectPathCacheIterator->second.dbusInterfaceNamesCache.find(interfaceName) - != dbusObjectPathCacheIterator->second.dbusInterfaceNamesCache.end()) { - std::string commonApiAddress; - translator_->translate( - dbusObjectPathCacheIterator->first + "/" + interfaceName + "/" + serviceName, commonApiAddress); - availableServiceInstances.push_back(commonApiAddress); - } - } - } - } - } - } - - // maybe partial list but it contains everything we know for now - return availableServiceInstances; +void DBusServiceRegistry::getAvailableServiceInstances(const std::string& dbusServiceName, + const std::string& dbusObjectPath, + DBusObjectManagerStub::DBusObjectPathAndInterfacesDict& availableServiceInstances) { + getManagedObjects(dbusServiceName, dbusObjectPath, availableServiceInstances); } -void DBusServiceRegistry::getAvailableServiceInstancesAsync(CommonAPI::Factory::AvailableInstancesCbk_t _cbk, - const std::string &_interface, - const std::string &_domain) { - //Necessary as service discovery might need some time, but the async version of "getAvailableServiceInstances" - //shall return without delay. - std::thread( - [this, _cbk, _interface, _domain](std::shared_ptr<DBusServiceRegistry> selfRef) { - (void)selfRef; - auto instances = getAvailableServiceInstances(_interface, _domain); - _cbk(instances); - }, this->shared_from_this() - ).detach(); +void DBusServiceRegistry::getAvailableServiceInstancesAsync(GetAvailableServiceInstancesCallback callback, + const std::string& dbusServiceName, + const std::string& dbusObjectPath) { + getManagedObjectsAsync(dbusServiceName, dbusObjectPath, [callback](const CallStatus& callStatus, + const DBusObjectManagerStub::DBusObjectPathAndInterfacesDict availableServiceInstances, + const std::string& dbusServiceName, + const std::string& dbusObjectPath) { + (void)callStatus; + (void)dbusServiceName; + (void)dbusObjectPath; + callback(availableServiceInstances); + }); } void DBusServiceRegistry::onSignalDBusMessage(const DBusMessage &_dbusMessage) { @@ -498,7 +405,8 @@ void DBusServiceRegistry::onSignalDBusMessage(const DBusMessage &_dbusMessage) { if (!_dbusMessage.hasInterfaceName("org.freedesktop.DBus.ObjectManager")) { COMMONAPI_ERROR(std::string(__FUNCTION__), " unexpected interface ", _dbusMessage.getInterface()); } - if (!_dbusMessage.hasMemberName("InterfacesAdded") && !_dbusMessage.hasMemberName("InterfacesAdded")) { + if (!_dbusMessage.hasMemberName("InterfacesAdded") && !_dbusMessage.hasMemberName("InterfacesAdded") && + !_dbusMessage.hasMemberName("InterfacesRemoved") && !_dbusMessage.hasMemberName("InterfacesRemoved") ) { COMMONAPI_ERROR(std::string(__FUNCTION__), " unexpected member ", _dbusMessage.getMember()); } @@ -598,10 +506,6 @@ void DBusServiceRegistry::resolveDBusServiceName(const std::string& dbusServiceN COMMONAPI_ERROR(std::string(__FUNCTION__), " unique name not empty ", dbusServiceListenersRecord.uniqueBusName); } - mutexServiceResolveCount.lock(); - servicesToResolve++; - mutexServiceResolveCount.unlock(); - if (dbusDaemonProxy_->isAvailable()) { auto func = std::bind( @@ -648,11 +552,6 @@ void DBusServiceRegistry::onGetNameOwnerCallback(const CallStatus& status, onDBusServiceNotAvailable(dbusServiceListenersRecord, dbusServiceName); } - mutexServiceResolveCount.lock(); - servicesToResolve--; - mutexServiceResolveCount.unlock(); - monitorResolveAllServices_.notify_all(); - dbusServicesMutex_.unlock(); } @@ -788,28 +687,51 @@ bool DBusServiceRegistry::resolveObjectPathWithObjectManager(const std::string& std::placeholders::_2, dbusServiceUniqueName, dbusObjectPath); - return getManagedObjects(dbusServiceUniqueName, "/", getManagedObjectsCallback); + return getManagedObjectsAsync(dbusServiceUniqueName, "/", getManagedObjectsCallback); } -bool DBusServiceRegistry::getManagedObjects(const std::string& dbusServiceUniqueName, - const std::string& dbusObjectPath, - GetManagedObjectsCallback callback) { - bool isSendingInProgress = false; +bool DBusServiceRegistry::getManagedObjects(const std::string& dbusServiceName, + const std::string& dbusObjectPath, + DBusObjectManagerStub::DBusObjectPathAndInterfacesDict& availableServiceInstances) { auto dbusConnection = dbusDaemonProxy_->getDBusConnection(); - if (dbusServiceUniqueName.empty()) { - COMMONAPI_ERROR(std::string(__FUNCTION__), " dbusServiceUniqueName empty"); + if (dbusServiceName.empty()) { + COMMONAPI_ERROR(std::string(__FUNCTION__), " dbusServiceName empty"); } if(dbusConnection->isConnected()) { - if(dbusObjectPath != "/") { - mutexObjectPathsResolveCount.lock(); - objectPathsToResolve++; - mutexObjectPathsResolveCount.unlock(); - } + DBusAddress dbusAddress(dbusServiceName, dbusObjectPath, "org.freedesktop.DBus.ObjectManager"); + DBusMessage dbusMessageCall = CommonAPI::DBus::DBusMessage::createMethodCall( + dbusAddress, + "GetManagedObjects"); + + DBusError error; + CallInfo* defaultCallInfo = new CallInfo(); + DBusMessage reply = dbusConnection->sendDBusMessageWithReplyAndBlock(dbusMessageCall, error, defaultCallInfo); + delete defaultCallInfo; + + DBusInputStream input(reply); + if (!DBusSerializableArguments<DBusObjectManagerStub::DBusObjectPathAndInterfacesDict>::deserialize( + input, availableServiceInstances) || error) + return false; + } + return true; +} + +bool DBusServiceRegistry::getManagedObjectsAsync(const std::string& dbusServiceName, + const std::string& dbusObjectPath, + GetManagedObjectsCallback callback) { + bool isSendingInProgress = false; + auto dbusConnection = dbusDaemonProxy_->getDBusConnection(); + + if (dbusServiceName.empty()) { + COMMONAPI_ERROR(std::string(__FUNCTION__), " dbusServiceName empty"); + } + + if(dbusConnection->isConnected()) { - DBusAddress dbusAddress(dbusServiceUniqueName, dbusObjectPath, "org.freedesktop.DBus.ObjectManager"); + DBusAddress dbusAddress(dbusServiceName, dbusObjectPath, "org.freedesktop.DBus.ObjectManager"); DBusMessage dbusMessageCall = CommonAPI::DBus::DBusMessage::createMethodCall( dbusAddress, "GetManagedObjects"); @@ -818,7 +740,7 @@ bool DBusServiceRegistry::getManagedObjects(const std::string& dbusServiceUnique callback, std::placeholders::_1, std::placeholders::_2, - dbusServiceUniqueName, + dbusServiceName, dbusObjectPath); DBusProxyAsyncCallbackHandler< @@ -840,14 +762,14 @@ bool DBusServiceRegistry::getManagedObjects(const std::string& dbusServiceUnique } void DBusServiceRegistry::onGetManagedObjectsCallbackResolve(const CallStatus& callStatus, - const DBusObjectManagerStub::DBusObjectPathAndInterfacesDict dbusObjectPathAndInterfacesDict, + const DBusObjectManagerStub::DBusObjectPathAndInterfacesDict availableServiceInstances, const std::string& dbusServiceUniqueName, const std::string& dbusObjectPath) { if(callStatus == CallStatus::SUCCESS) { //has object manager bool objectPathFound = false; - for(auto objectPathDict : dbusObjectPathAndInterfacesDict) + for(auto objectPathDict : availableServiceInstances) { std::string objectPath = objectPathDict.first; if(objectPath != dbusObjectPath) @@ -863,16 +785,6 @@ void DBusServiceRegistry::onGetManagedObjectsCallbackResolve(const CallStatus& c processManagedObject(dbusObjectPath, dbusServiceUniqueName, interfaceName); dbusServicesMutex_.unlock(); } - - // resolve further managed objects - auto callback = std::bind( - &DBusServiceRegistry::onGetManagedObjectsCallbackResolveFurther, - this->shared_from_this(), - std::placeholders::_1, - std::placeholders::_2, - std::placeholders::_3, - std::placeholders::_4); - getManagedObjects(dbusServiceUniqueName, dbusObjectPath, callback); } if(!objectPathFound) { @@ -885,193 +797,41 @@ void DBusServiceRegistry::onGetManagedObjectsCallbackResolve(const CallStatus& c dbusServiceUniqueName, dbusObjectPath); std::string objectPathManager = dbusObjectPath.substr(0, dbusObjectPath.find_last_of("\\/")); - getManagedObjects(dbusServiceUniqueName, objectPathManager, getManagedObjectsCallback); + getManagedObjectsAsync(dbusServiceUniqueName, objectPathManager, getManagedObjectsCallback); } } else { COMMONAPI_ERROR("There is no Object Manager that manages " + dbusObjectPath + ". Resolving failed!"); } } -void DBusServiceRegistry::onGetManagedObjectsCallbackResolveFurther(const CallStatus& callStatus, - const DBusObjectManagerStub::DBusObjectPathAndInterfacesDict dbusObjectPathAndInterfacesDict, - const std::string& dbusServiceUniqueName, - const std::string& dbusObjectPath) { - - if(callStatus == CallStatus::SUCCESS) { - for(auto objectPathDict : dbusObjectPathAndInterfacesDict) - { - //resolve - std::string objectPath = objectPathDict.first; - CommonAPI::DBus::DBusObjectManagerStub::DBusInterfacesAndPropertiesDict interfacesAndPropertiesDict = objectPathDict.second; - for(auto interfaceDict : interfacesAndPropertiesDict) - { - std::string interfaceName = interfaceDict.first; - dbusServicesMutex_.lock(); - processManagedObject(objectPath, dbusServiceUniqueName, interfaceName); - dbusServicesMutex_.unlock(); - } - - // resolve further managed objects - auto callback = std::bind( - &DBusServiceRegistry::onGetManagedObjectsCallbackResolveFurther, - this->shared_from_this(), - std::placeholders::_1, - std::placeholders::_2, - std::placeholders::_3, - std::placeholders::_4); - getManagedObjects(dbusServiceUniqueName, objectPath, callback); - } - } else { - // No further managed objects - } - - dbusServicesMutex_.lock(); +void DBusServiceRegistry::processManagedObject(const std::string& dbusObjectPath, + const std::string& dbusServiceUniqueName, + const std::string& interfaceName) { auto dbusServiceUniqueNameIterator = dbusUniqueNamesMap_.find(dbusServiceUniqueName); const bool isDBusServiceUniqueNameFound = (dbusServiceUniqueNameIterator != dbusUniqueNamesMap_.end()); - if (!isDBusServiceUniqueNameFound) { - dbusServicesMutex_.unlock(); + if (!isDBusServiceUniqueNameFound) return; - } DBusUniqueNameRecord& dbusUniqueNameRecord = dbusServiceUniqueNameIterator->second; auto dbusObjectPathIterator = dbusUniqueNameRecord.dbusObjectPathsCache.find(dbusObjectPath); const bool isDBusObjectPathFound = (dbusObjectPathIterator != dbusUniqueNameRecord.dbusObjectPathsCache.end()); - if (!isDBusObjectPathFound) { - dbusServicesMutex_.unlock(); + if (!isDBusObjectPathFound) return; - } DBusObjectPathCache& dbusObjectPathRecord = dbusObjectPathIterator->second; - dbusObjectPathRecord.state = DBusRecordState::RESOLVED; - if(dbusObjectPathRecord.futureOnResolve.valid()) { - dbusObjectPathRecord.promiseOnResolve.set_value(dbusObjectPathRecord.state); - } - mutexObjectPathsResolveCount.lock(); - objectPathsToResolve--; - mutexObjectPathsResolveCount.unlock(); - monitorResolveAllObjectPaths_.notify_all(); - - dbusUniqueNameRecord.objectPathsState = DBusRecordState::RESOLVED; - - notifyDBusServiceListeners( - dbusUniqueNameRecord, - dbusObjectPath, - dbusObjectPathRecord.dbusInterfaceNamesCache, - DBusRecordState::RESOLVED); - - dbusServicesMutex_.unlock(); -} - -void DBusServiceRegistry::processManagedObject(const std::string& dbusObjectPath, - const std::string& dbusServiceUniqueName, - const std::string& interfaceName) { - DBusUniqueNameRecord& dbusUniqueNameRecord = dbusUniqueNamesMap_[dbusServiceUniqueName]; - DBusObjectPathCache& dbusObjectPathCache = dbusUniqueNameRecord.dbusObjectPathsCache[dbusObjectPath]; - if(!isOrgFreedesktopDBusInterface(interfaceName)) { - dbusObjectPathCache.dbusInterfaceNamesCache.insert(interfaceName); + dbusObjectPathRecord.dbusInterfaceNamesCache.insert(interfaceName); } else if (translator_->isOrgFreedesktopDBusPeerMapped() && (interfaceName == "org.freedesktop.DBus.Peer")) { - dbusObjectPathCache.dbusInterfaceNamesCache.insert(interfaceName); - } -} - -bool DBusServiceRegistry::introspectDBusObjectPath(const std::string& dbusServiceUniqueName, - const std::string& dbusObjectPath) { - bool isResolvingInProgress = false; - auto dbusConnection = dbusDaemonProxy_->getDBusConnection(); - - if (dbusServiceUniqueName.empty()) { - COMMONAPI_ERROR(std::string(__FUNCTION__), " dbusServiceUniqueName empty"); + dbusObjectPathRecord.dbusInterfaceNamesCache.insert(interfaceName); } - if (dbusConnection->isConnected()) { - mutexObjectPathsResolveCount.lock(); - objectPathsToResolve++; - mutexObjectPathsResolveCount.unlock(); - - DBusAddress dbusAddress(dbusServiceUniqueName, dbusObjectPath, "org.freedesktop.DBus.Introspectable"); - DBusMessage dbusMessageCall = DBusMessage::createMethodCall( - dbusAddress, - "Introspect"); - auto instrospectAsyncCallback = std::bind( - &DBusServiceRegistry::onIntrospectCallback, - this->shared_from_this(), - std::placeholders::_1, - std::placeholders::_2, - dbusServiceUniqueName, - dbusObjectPath); - - DBusProxyAsyncCallbackHandler< - DBusServiceRegistry, - std::string - >::Delegate delegate(shared_from_this(), instrospectAsyncCallback); - - dbusConnection->sendDBusMessageWithReplyAsync( - dbusMessageCall, - DBusProxyAsyncCallbackHandler< - DBusServiceRegistry, - std::string - >::create(delegate, std::tuple<std::string>()), - &serviceRegistryInfo); - - isResolvingInProgress = true; - } - return isResolvingInProgress; -} - -/** - * Callback for org.freedesktop.DBus.Introspectable.Introspect - * - * This is the other end of checking if a dbus object path is available. - * On success it'll extract all interface names from the xml data response. - * Special interfaces that start with "org.freedesktop.DBus." will be ignored. - * - * @param status - * @param xmlData - * @param dbusServiceUniqueName - * @param dbusObjectPath - */ -void DBusServiceRegistry::onIntrospectCallback(const CallStatus& callStatus, - std::string xmlData, - const std::string& dbusServiceUniqueName, - const std::string& dbusObjectPath) { - if (callStatus == CallStatus::SUCCESS) { - parseIntrospectionData(xmlData, dbusObjectPath, dbusServiceUniqueName); - } - - dbusServicesMutex_.lock(); - - // Error CallStatus will result in empty parsedDBusInterfaceNameSet (and not available notification) - - auto dbusServiceUniqueNameIterator = dbusUniqueNamesMap_.find(dbusServiceUniqueName); - const bool isDBusServiceUniqueNameFound = (dbusServiceUniqueNameIterator != dbusUniqueNamesMap_.end()); - - if (!isDBusServiceUniqueNameFound) { - dbusServicesMutex_.unlock(); - return; - } - - DBusUniqueNameRecord& dbusUniqueNameRecord = dbusServiceUniqueNameIterator->second; - auto dbusObjectPathIterator = dbusUniqueNameRecord.dbusObjectPathsCache.find(dbusObjectPath); - const bool isDBusObjectPathFound = (dbusObjectPathIterator != dbusUniqueNameRecord.dbusObjectPathsCache.end()); - - if (!isDBusObjectPathFound) { - dbusServicesMutex_.unlock(); - return; - } - - DBusObjectPathCache& dbusObjectPathRecord = dbusObjectPathIterator->second; - dbusObjectPathRecord.state = DBusRecordState::RESOLVED; - dbusObjectPathRecord.promiseOnResolve.set_value(dbusObjectPathRecord.state); - mutexObjectPathsResolveCount.lock(); - objectPathsToResolve--; - mutexObjectPathsResolveCount.unlock(); - monitorResolveAllObjectPaths_.notify_all(); + if(dbusObjectPathRecord.futureOnResolve.valid()) + dbusObjectPathRecord.promiseOnResolve.set_value(dbusObjectPathRecord.state); dbusUniqueNameRecord.objectPathsState = DBusRecordState::RESOLVED; @@ -1080,104 +840,6 @@ void DBusServiceRegistry::onIntrospectCallback(const CallStatus& callStatus, dbusObjectPath, dbusObjectPathRecord.dbusInterfaceNamesCache, DBusRecordState::RESOLVED); - - dbusServicesMutex_.unlock(); -} - -void DBusServiceRegistry::parseIntrospectionNode(const pugi::xml_node& node, const std::string& rootObjectPath, const std::string& fullObjectPath, const std::string& dbusServiceUniqueName) { - std::string nodeName; - - for(pugi::xml_node& subNode : node.children()) { - nodeName = std::string(subNode.name()); - - if(nodeName == "node") { - processIntrospectionObjectPath(subNode, rootObjectPath, dbusServiceUniqueName); - } - - if(nodeName == "interface") { - processIntrospectionInterface(subNode, rootObjectPath, fullObjectPath, dbusServiceUniqueName); - } - } -} - -void DBusServiceRegistry::processIntrospectionObjectPath(const pugi::xml_node& node, const std::string& rootObjectPath, const std::string& dbusServiceUniqueName) { - std::string fullObjectPath = rootObjectPath; - - if(fullObjectPath.at(fullObjectPath.length()-1) != '/') { - fullObjectPath += "/"; - } - - fullObjectPath += std::string(node.attribute("name").as_string()); - - DBusUniqueNameRecord& dbusUniqueNameRecord = dbusUniqueNamesMap_[dbusServiceUniqueName]; - DBusObjectPathCache& dbusObjectPathCache = dbusUniqueNameRecord.dbusObjectPathsCache[fullObjectPath]; - - if(dbusObjectPathCache.state == DBusRecordState::UNKNOWN) { - dbusObjectPathCache.state = DBusRecordState::RESOLVING; - introspectDBusObjectPath(dbusServiceUniqueName, fullObjectPath); - } - - for(pugi::xml_node subNode : node.children()) { - parseIntrospectionNode(subNode, fullObjectPath, fullObjectPath, dbusServiceUniqueName); - } -} - -void DBusServiceRegistry::processIntrospectionInterface(const pugi::xml_node& node, const std::string& rootObjectPath, const std::string& fullObjectPath, const std::string& dbusServiceUniqueName) { - std::string interfaceName = node.attribute("name").as_string(); - DBusUniqueNameRecord& dbusUniqueNameRecord = dbusUniqueNamesMap_[dbusServiceUniqueName]; - DBusObjectPathCache& dbusObjectPathCache = dbusUniqueNameRecord.dbusObjectPathsCache[fullObjectPath]; - - if(!isOrgFreedesktopDBusInterface(interfaceName)) { - dbusObjectPathCache.dbusInterfaceNamesCache.insert(interfaceName); - } else if (translator_->isOrgFreedesktopDBusPeerMapped() && (interfaceName == "org.freedesktop.DBus.Peer")) { - dbusObjectPathCache.dbusInterfaceNamesCache.insert(interfaceName); - } - - for(pugi::xml_node subNode : node.children()) { - parseIntrospectionNode(subNode, rootObjectPath, fullObjectPath, dbusServiceUniqueName); - } -} - -void DBusServiceRegistry::parseIntrospectionData(const std::string& xmlData, - const std::string& rootObjectPath, - const std::string& dbusServiceUniqueName) { - pugi::xml_document xmlDocument; - pugi::xml_parse_result parsedResult = xmlDocument.load_buffer(xmlData.c_str(), xmlData.length(), pugi::parse_minimal, pugi::encoding_utf8); - - if(parsedResult.status != pugi::xml_parse_status::status_ok) { - return; - } - - const pugi::xml_node rootNode = xmlDocument.child("node"); - - dbusServicesMutex_.lock(); - - parseIntrospectionNode(rootNode, rootObjectPath, rootObjectPath, dbusServiceUniqueName); - - dbusUniqueNamesMap_[dbusServiceUniqueName]; - dbusServicesMutex_.unlock(); -} - - -void DBusServiceRegistry::onDBusDaemonProxyStatusEvent(const AvailabilityStatus& availabilityStatus) { - if (availabilityStatus == AvailabilityStatus::UNKNOWN) { - COMMONAPI_ERROR(std::string(__FUNCTION__), " unexpected availability status ", int(availabilityStatus)); - } - - dbusServicesMutex_.lock(); - - for (auto& dbusServiceListenersIterator : dbusServiceListenersMap) { - const auto& dbusServiceName = dbusServiceListenersIterator.first; - auto& dbusServiceListenersRecord = dbusServiceListenersIterator.second; - - if (availabilityStatus == AvailabilityStatus::AVAILABLE) { - resolveDBusServiceName(dbusServiceName, dbusServiceListenersRecord); - } else { - onDBusServiceNotAvailable(dbusServiceListenersRecord, dbusServiceName); - } - } - - dbusServicesMutex_.unlock(); } void DBusServiceRegistry::checkDBusServiceWasAvailable(const std::string& dbusServiceName, @@ -1378,6 +1040,9 @@ void DBusServiceRegistry::notifyDBusObjectPathChanged(DBusInterfaceNameListeners auto& dbusInterfaceNameListenersRecord = dbusInterfaceNameListenersIterator->second; notifyDBusInterfaceNameListeners(dbusInterfaceNameListenersRecord, isDBusInterfaceNameAvailable); + + if (dbusInterfaceNameListenersRecord.listenerList.empty()) + dbusInterfaceNameListenersMap.erase(dbusInterfaceNameListenersIterator); } } } @@ -1394,10 +1059,20 @@ void DBusServiceRegistry::notifyDBusInterfaceNameListeners(DBusInterfaceNameList } dbusInterfaceNameListenersRecord.state = notifyState; - for (auto dbusServiceListenerIterator = dbusInterfaceNameListenersRecord.listenerList.begin(); - dbusServiceListenerIterator != dbusInterfaceNameListenersRecord.listenerList.end(); - dbusServiceListenerIterator++) { - (*dbusServiceListenerIterator)(availabilityStatus); + auto dbusServiceListenerIterator = dbusInterfaceNameListenersRecord.listenerList.begin(); + while (dbusServiceListenerIterator != dbusInterfaceNameListenersRecord.listenerList.end()) { + + auto itsRemoveListenerIt = std::find(dbusInterfaceNameListenersRecord.listenersToRemove.begin(), + dbusInterfaceNameListenersRecord.listenersToRemove.end(), + dbusServiceListenerIterator); + + if(itsRemoveListenerIt != dbusInterfaceNameListenersRecord.listenersToRemove.end()) { + dbusInterfaceNameListenersRecord.listenersToRemove.remove(dbusServiceListenerIterator); + dbusServiceListenerIterator = dbusInterfaceNameListenersRecord.listenerList.erase(dbusServiceListenerIterator); + } else { + (*dbusServiceListenerIterator)(availabilityStatus); + ++dbusServiceListenerIterator; + } } } diff --git a/src/CommonAPI/DBus/DBusStubAdapter.cpp b/src/CommonAPI/DBus/DBusStubAdapter.cpp index 422a419..0f86cc1 100644 --- a/src/CommonAPI/DBus/DBusStubAdapter.cpp +++ b/src/CommonAPI/DBus/DBusStubAdapter.cpp @@ -16,12 +16,11 @@ DBusStubAdapter::DBusStubAdapter(const DBusAddress &_dbusAddress, : dbusAddress_(_dbusAddress), connection_(_connection), isManaging_(_isManaging) { - Factory::get()->incrementConnection(connection_); } DBusStubAdapter::~DBusStubAdapter() { deinit(); - Factory::get()->decrementConnection(connection_); + Factory::get()->unregisterStub(address_.getDomain(), address_.getInterface(), address_.getInstance()); } void DBusStubAdapter::init(std::shared_ptr<DBusStubAdapter> _instance) { diff --git a/src/test/DBusConnectionTest.cpp b/src/test/DBusConnectionTest.cpp index c943e5f..8882532 100644 --- a/src/test/DBusConnectionTest.cpp +++ b/src/test/DBusConnectionTest.cpp @@ -13,7 +13,9 @@ #include <gtest/gtest.h> #include <dbus/dbus.h> +#include <chrono> #include <cstring> +#include <thread> bool replyArrived; @@ -62,19 +64,19 @@ TEST_F(DBusConnectionTest, ConnectionStatusEventWorks) { std::placeholders::_1)); ASSERT_FALSE(dbusConnection_->isConnected()); - ASSERT_EQ(connectionStatusEventCount, 0); + ASSERT_EQ(connectionStatusEventCount, 0u); uint32_t expectedEventCount = 0; while (expectedEventCount < 10) { ASSERT_TRUE(dbusConnection_->connect()); ASSERT_TRUE(dbusConnection_->isConnected()); - usleep(20000); + std::this_thread::sleep_for(std::chrono::microseconds(20000)); ASSERT_EQ(connectionStatusEventCount, ++expectedEventCount); ASSERT_EQ(connectionStatus, CommonAPI::AvailabilityStatus::AVAILABLE); dbusConnection_->disconnect(); ASSERT_FALSE(dbusConnection_->isConnected()); - usleep(20000); + std::this_thread::sleep_for(std::chrono::microseconds(20000)); ASSERT_EQ(connectionStatusEventCount, ++expectedEventCount); ASSERT_EQ(connectionStatus, CommonAPI::AvailabilityStatus::NOT_AVAILABLE); } @@ -140,7 +142,7 @@ TEST_F(DBusConnectionTest, SendingAsyncDBusMessagesWorks) { &CommonAPI::DBus::defaultCallInfo); for (int i = 0; i < 100; i++) { - usleep(10); + std::this_thread::sleep_for(std::chrono::microseconds(10)); } ASSERT_EQ(serviceHandlerDBusMessageCount, expectedDBusMessageCount); @@ -262,7 +264,7 @@ TEST_F(DBusConnectionTest, LibdbusConnectionsMayCommitSuicide) { std::unique_lock<std::mutex> dispatchLock(dispatchMutex); while(!dispatchReady) { dispatchLock.unlock(); - usleep(100000 * 5); + std::this_thread::sleep_for(std::chrono::microseconds((100000 * 5))); dispatchLock.lock(); } } diff --git a/src/test/DBusDaemonProxyTest.cpp b/src/test/DBusDaemonProxyTest.cpp index cc17126..b2a4364 100644 --- a/src/test/DBusDaemonProxyTest.cpp +++ b/src/test/DBusDaemonProxyTest.cpp @@ -41,10 +41,10 @@ TEST_F(DBusDaemonProxyTest, ListNames) { dbusDaemonProxy_->listNames(callStatus, busNames); ASSERT_EQ(callStatus, CommonAPI::CallStatus::SUCCESS); - ASSERT_GT(busNames.size(), 0); + ASSERT_GT(busNames.size(), 0u); for (const std::string& busName : busNames) { ASSERT_FALSE(busName.empty()); - ASSERT_GT(busName.length(), 1); + ASSERT_GT(busName.length(), 1u); } } @@ -73,10 +73,10 @@ TEST_F(DBusDaemonProxyTest, ListNamesAsync) { ASSERT_EQ(callStatus, CommonAPI::CallStatus::SUCCESS); - ASSERT_GT(busNames.size(), 0); + ASSERT_GT(busNames.size(), 0u); for (const std::string& busName : busNames) { ASSERT_FALSE(busName.empty()); - ASSERT_GT(busName.length(), 1); + ASSERT_GT(busName.length(), 1u); } } |