summaryrefslogtreecommitdiff
path: root/src/CommonAPI
diff options
context:
space:
mode:
Diffstat (limited to 'src/CommonAPI')
-rw-r--r--src/CommonAPI/DBus/DBusAddressTranslator.cpp39
-rw-r--r--src/CommonAPI/DBus/DBusConnection.cpp235
-rw-r--r--src/CommonAPI/DBus/DBusDaemonProxy.cpp10
-rw-r--r--src/CommonAPI/DBus/DBusFactory.cpp206
-rw-r--r--src/CommonAPI/DBus/DBusInstanceAvailabilityStatusChangedEvent.cpp269
-rw-r--r--src/CommonAPI/DBus/DBusMainLoopContext.cpp117
-rw-r--r--src/CommonAPI/DBus/DBusOutputStream.cpp9
-rw-r--r--src/CommonAPI/DBus/DBusProxy.cpp98
-rw-r--r--src/CommonAPI/DBus/DBusProxyManager.cpp142
-rw-r--r--src/CommonAPI/DBus/DBusServiceRegistry.cpp599
-rw-r--r--src/CommonAPI/DBus/DBusStubAdapter.cpp3
11 files changed, 886 insertions, 841 deletions
diff --git a/src/CommonAPI/DBus/DBusAddressTranslator.cpp b/src/CommonAPI/DBus/DBusAddressTranslator.cpp
index cb208c0..8f67aa2 100644
--- a/src/CommonAPI/DBus/DBusAddressTranslator.cpp
+++ b/src/CommonAPI/DBus/DBusAddressTranslator.cpp
@@ -67,9 +67,10 @@ DBusAddressTranslator::translate(const CommonAPI::Address &_key, DBusAddress &_v
_value = it->second;
} else if (isDefault_) {
std::string interfaceName(_key.getInterface());
+ std::replace(interfaceName.begin(), interfaceName.end(), ':', '.');
std::string objectPath("/" + _key.getInstance());
std::replace(objectPath.begin(), objectPath.end(), '.', '/');
- std::string service(_key.getInterface() + "_" + _key.getInstance());
+ std::string service(interfaceName + "_" + _key.getInstance());
if (isValid(service, '.', false, false, true)
&& isValid(objectPath, '/', true)
@@ -81,6 +82,11 @@ DBusAddressTranslator::translate(const CommonAPI::Address &_key, DBusAddress &_v
forwards_.insert({ _key, _value });
backwards_.insert({ _value, _key });
}
+ else {
+ COMMONAPI_ERROR(
+ "Translation from CommonAPI address to DBus address failed!");
+ result = false;
+ }
} else {
result = false;
}
@@ -101,6 +107,9 @@ DBusAddressTranslator::translate(const DBusAddress &_key, std::string &_value) {
bool
DBusAddressTranslator::translate(const DBusAddress &_key, CommonAPI::Address &_value) {
bool result(true);
+ std::size_t itsInterfacePos;
+ std::string itsVersion;
+ bool isValidVersion(true);
std::lock_guard<std::mutex> itsLock(mutex_);
const auto it = backwards_.find(_key);
@@ -109,6 +118,34 @@ DBusAddressTranslator::translate(const DBusAddress &_key, CommonAPI::Address &_v
} else if (isDefault_) {
if (isValid(_key.getObjectPath(), '/', true) && isValid(_key.getInterface(), '.')) {
std::string interfaceName(_key.getInterface());
+ itsInterfacePos = interfaceName.rfind('.');
+ itsInterfacePos++;
+ if( itsInterfacePos != std::string::npos
+ && ( interfaceName.length() - itsInterfacePos >= 4) ) {
+ itsVersion = interfaceName.substr(itsInterfacePos);
+ if( itsVersion != "" ) {
+ std::size_t itsSeparatorPos = itsVersion.find('_');
+ if (itsSeparatorPos == std::string::npos) {
+ isValidVersion = false;
+ }
+ if(isValidVersion) {
+ if( *(itsVersion.begin()) != 'v') {
+ isValidVersion = false;
+ }
+ if(isValidVersion) {
+ for (auto it = itsVersion.begin()+1; it != itsVersion.end(); ++it) {
+ if (!isdigit(*it) && *it != '_') {
+ isValidVersion = false;
+ break;
+ }
+ }
+ }
+ if( isValidVersion ) {
+ interfaceName.replace(itsInterfacePos - 1, 1, ":");
+ }
+ }
+ }
+ }
std::string instance(_key.getObjectPath().substr(1));
std::replace(instance.begin(), instance.end(), '/', '.');
diff --git a/src/CommonAPI/DBus/DBusConnection.cpp b/src/CommonAPI/DBus/DBusConnection.cpp
index 6071830..23967a9 100644
--- a/src/CommonAPI/DBus/DBusConnection.cpp
+++ b/src/CommonAPI/DBus/DBusConnection.cpp
@@ -20,6 +20,22 @@
namespace CommonAPI {
namespace DBus {
+void MsgReplyQueueEntry::process(std::shared_ptr<DBusConnection> _connection) {
+ _connection->dispatchDBusMessageReply(message_, replyAsyncHandler_);
+}
+
+void MsgReplyQueueEntry::clear() {
+ delete replyAsyncHandler_;
+}
+
+void MsgQueueEntry::process(std::shared_ptr<DBusConnection> _connection) {
+ (void)_connection;
+}
+
+void MsgQueueEntry::clear() {
+
+}
+
DBusConnectionStatusEvent::DBusConnectionStatusEvent(DBusConnection* dbusConnection):
dbusConnection_(dbusConnection) {
}
@@ -52,6 +68,7 @@ DBusConnection::DBusConnection(DBusType_t busType,
dispatchThread_(NULL),
dispatchSource_(),
watchContext_(NULL),
+ timeoutContext_(NULL),
connection_(NULL),
busType_(busType),
dbusConnectionStatusEvent_(this),
@@ -74,6 +91,7 @@ DBusConnection::DBusConnection(::DBusConnection *_connection,
dispatchThread_(NULL),
dispatchSource_(),
watchContext_(NULL),
+ timeoutContext_(NULL),
connection_(_connection),
busType_(DBusType_t::WRAPPED),
dbusConnectionStatusEvent_(this),
@@ -86,7 +104,7 @@ DBusConnection::DBusConnection(::DBusConnection *_connection,
activeConnections_(0),
isDisconnecting_(false),
isDispatching_(false),
- isWaitingOnFinishedDispatching_(false){
+ isWaitingOnFinishedDispatching_(false) {
dbus_threads_init_default();
}
@@ -106,10 +124,11 @@ DBusConnection::~DBusConnection() {
dbus_connection_set_timeout_functions(connection_, NULL, NULL, NULL, NULL, NULL);
}
- lockedContext->deregisterWatch(msgWatch_);
- lockedContext->deregisterDispatchSource(msgDispatchSource_);
+ lockedContext->deregisterWatch(queueWatch_);
+ lockedContext->deregisterDispatchSource(queueDispatchSource_);
lockedContext->deregisterDispatchSource(dispatchSource_);
delete watchContext_;
+ delete timeoutContext_;
}
// ensure, the registry survives until disconnecting is done...
@@ -170,21 +189,26 @@ DBusConnection::~DBusConnection() {
}
}
-
bool DBusConnection::attachMainLoopContext(std::weak_ptr<MainLoopContext> mainLoopContext) {
mainLoopContext_ = mainLoopContext;
if (auto lockedContext = mainLoopContext_.lock()) {
- msgWatch_ = new DBusMessageWatch(shared_from_this());
- msgDispatchSource_ = new DBusMessageDispatchSource(msgWatch_);
+ queueWatch_ = new DBusQueueWatch(shared_from_this());
+ queueDispatchSource_ = new DBusQueueDispatchSource(queueWatch_);
- lockedContext->registerDispatchSource(msgDispatchSource_);
- lockedContext->registerWatch(msgWatch_);
+ lockedContext->registerDispatchSource(queueDispatchSource_);
+ lockedContext->registerWatch(queueWatch_);
dispatchSource_ = new DBusDispatchSource(this);
watchContext_ = new WatchContext(mainLoopContext_, dispatchSource_, shared_from_this());
+ timeoutContext_ = new TimeoutContext(mainLoopContext_, shared_from_this());
lockedContext->registerDispatchSource(dispatchSource_);
+ if (!isConnected()) {
+ COMMONAPI_ERROR(std::string(__FUNCTION__), "not connected");
+ return false;
+ }
+
dbus_connection_set_wakeup_main_function(
connection_,
&DBusConnection::onWakeupMainContext,
@@ -208,7 +232,7 @@ bool DBusConnection::attachMainLoopContext(std::weak_ptr<MainLoopContext> mainLo
&DBusConnection::onAddTimeout,
&DBusConnection::onRemoveTimeout,
&DBusConnection::onToggleTimeout,
- &mainLoopContext_,
+ timeoutContext_,
NULL);
if (!success) {
@@ -297,13 +321,13 @@ void DBusConnection::onToggleWatch(::DBusWatch* libdbusWatch, void* data) {
dbus_bool_t DBusConnection::onAddTimeout(::DBusTimeout* libdbusTimeout, void* data) {
- std::weak_ptr<MainLoopContext>* mainloop = static_cast<std::weak_ptr<MainLoopContext>*>(data);
- if (NULL == mainloop) {
- COMMONAPI_ERROR(std::string(__FUNCTION__), "mainloop == NULL");
+ TimeoutContext* timeoutContext = static_cast<TimeoutContext*>(data);
+ if (NULL == timeoutContext) {
+ COMMONAPI_ERROR(std::string(__FUNCTION__), "timeoutContext == NULL");
return FALSE;
}
- DBusTimeout* dbusTimeout = new DBusTimeout(libdbusTimeout, *mainloop);
+ DBusTimeout* dbusTimeout = new DBusTimeout(libdbusTimeout, timeoutContext->mainLoopContext_, timeoutContext->dbusConnection_);
dbus_timeout_set_data(libdbusTimeout, dbusTimeout, NULL);
if (dbusTimeout->isReadyToBeMonitored()) {
@@ -364,11 +388,11 @@ bool DBusConnection::connect(DBusError &dbusError, bool startDispatchThread) {
#ifdef _MSC_VER
COMMONAPI_ERROR(std::string(__FUNCTION__) +
": Name: " + dbusError.getName() +
- " Message: " + dbusError.getMessage())
+ " Message: " + dbusError.getMessage());
#else
COMMONAPI_ERROR(std::string(__PRETTY_FUNCTION__) +
": Name: " + dbusError.getName() +
- " Message: " + dbusError.getMessage())
+ " Message: " + dbusError.getMessage());
#endif
return false;
}
@@ -404,8 +428,15 @@ void DBusConnection::disconnect() {
std::lock_guard<std::mutex> dbusConnectionLock(connectionGuard_);
std::unique_lock<std::mutex> dispatchLock(dispatchMutex_);
+
+ std::shared_ptr<DBusServiceRegistry> itsRegistry = DBusServiceRegistry::get(shared_from_this());
+
isDisconnecting_ = true;
+ if (std::shared_ptr<CommonAPI::MainLoopContext> mainLoopContext = mainLoopContext_.lock()) {
+ Factory::get()->releaseConnection(connectionId_);
+ }
+
if (isConnected()) {
dbusConnectionStatusEvent_.notifyListeners(AvailabilityStatus::NOT_AVAILABLE);
@@ -457,10 +488,6 @@ void DBusConnection::disconnect() {
dbus_connection_unref(connection_);
connection_ = nullptr;
}
-
- if (std::shared_ptr<CommonAPI::MainLoopContext> mainLoopContext = mainLoopContext_.lock()) {
- Factory::get()->releaseConnection(connectionId_, mainLoopContext.get());
- }
}
bool DBusConnection::isConnected() const {
@@ -504,11 +531,11 @@ bool DBusConnection::requestServiceNameAndBlock(const std::string& serviceName)
#ifdef _MSC_VER // Visual Studio
COMMONAPI_ERROR(std::string(__FUNCTION__) +
": Name: " + dbusError.getName() +
- " Message: " + dbusError.getMessage())
+ " Message: " + dbusError.getMessage());
#else
COMMONAPI_ERROR(std::string(__PRETTY_FUNCTION__) +
": Name: " + dbusError.getName() +
- " Message: " + dbusError.getMessage())
+ " Message: " + dbusError.getMessage());
#endif
}
}
@@ -594,10 +621,11 @@ void DBusConnection::onLibdbusPendingCall(::DBusPendingCall* _libdbusPendingCall
_dbusMessageReplyAsyncHandler->lock();
// libdbus calls the cleanup method below
- if(_libdbusPendingCall)
+ if(_libdbusPendingCall && processAsyncHandler) {
dbus_pending_call_unref(_libdbusPendingCall);
+ _dbusMessageReplyAsyncHandler->setExecutionFinished();
+ }
- _dbusMessageReplyAsyncHandler->setExecutionFinished();
if (_dbusMessageReplyAsyncHandler->hasToBeDeleted()) {
_dbusMessageReplyAsyncHandler->unlock();
delete _dbusMessageReplyAsyncHandler;
@@ -619,7 +647,6 @@ void DBusConnection::onLibdbusPendingCallNotifyThunk(::DBusPendingCall* _libdbus
auto pendingCallNotificationData = reinterpret_cast<PendingCallNotificationData*>(_userData);
auto dbusMessageReplyAsyncHandler = pendingCallNotificationData->replyAsyncHandler_;
auto dbusConnection = pendingCallNotificationData->dbusConnection_;
- delete pendingCallNotificationData;
DBusMessage dbusMessage = DBusConnection::convertToDBusMessage(_libdbusPendingCall);
@@ -627,9 +654,8 @@ void DBusConnection::onLibdbusPendingCallNotifyThunk(::DBusPendingCall* _libdbus
}
void DBusConnection::onLibdbusDataCleanup(void *_data) {
- // Dummy method -> deleting of userData is not executed in this method. Deleting is
- // executed by handling of the timeouts.
- (void)_data;
+ auto pendingCallNotificationData = reinterpret_cast<PendingCallNotificationData*>(_data);
+ delete pendingCallNotificationData;
}
//Would not be needed if libdbus would actually handle its timeouts for pending calls.
@@ -648,7 +674,7 @@ void DBusConnection::enforceAsynchronousTimeouts() const {
auto minTimeout = std::get<0>(minTimeoutElement->second);
- std::chrono::high_resolution_clock::time_point now = std::chrono::high_resolution_clock::now();
+ std::chrono::steady_clock::time_point now = (std::chrono::steady_clock::time_point) std::chrono::steady_clock::now();
timeout = (int)std::chrono::duration_cast<std::chrono::milliseconds>(minTimeout - now).count();
}
@@ -662,7 +688,7 @@ void DBusConnection::enforceAsynchronousTimeouts() const {
enforceTimeoutMutex_.lock();
auto it = timeoutMap_.begin();
while (it != timeoutMap_.end()) {
- std::chrono::high_resolution_clock::time_point now = std::chrono::high_resolution_clock::now();
+ std::chrono::steady_clock::time_point now = (std::chrono::steady_clock::time_point) std::chrono::steady_clock::now();
DBusMessageReplyAsyncHandler* asyncHandler = std::get<1>(it->second);
DBusPendingCall* libdbusPendingCall = it->first;
@@ -683,7 +709,7 @@ void DBusConnection::enforceAsynchronousTimeouts() const {
if (executionStarted && !executionFinished) {
// execution of asyncHandler is still running
// ==> add 100 ms for next timeout check
- std::get<0>(it->second) = std::chrono::high_resolution_clock::now() + std::chrono::milliseconds(100);
+ std::get<0>(it->second) = (std::chrono::steady_clock::time_point) std::chrono::steady_clock::now() + std::chrono::milliseconds(100);
} else {
if (!executionFinished) {
// execution of asyncHandler was not finished (and not started)
@@ -729,6 +755,7 @@ void DBusConnection::enforceAsynchronousTimeouts() const {
}
enforceTimeoutMutex_.unlock();
} else {
+
std::lock_guard<std::mutex> itsLock(enforceTimeoutMutex_);
auto it = timeoutMap_.begin();
@@ -768,18 +795,20 @@ void DBusConnection::enforceAsynchronousTimeouts() const {
}
}
-std::future<CallStatus> DBusConnection::sendDBusMessageWithReplyAsync(
+bool DBusConnection::sendDBusMessageWithReplyAsync(
const DBusMessage& dbusMessage,
std::unique_ptr<DBusMessageReplyAsyncHandler> dbusMessageReplyAsyncHandler,
const CommonAPI::CallInfo *_info) const {
+ std::lock_guard<std::mutex> dbusConnectionLock(connectionGuard_);
+
if (!dbusMessage) {
COMMONAPI_ERROR(std::string(__FUNCTION__), "message == NULL");
- return std::future<CallStatus>();
+ return false;
}
if (!isConnected()) {
COMMONAPI_ERROR(std::string(__FUNCTION__), "not connected");
- return std::future<CallStatus>();
+ return false;
}
DBusPendingCall* libdbusPendingCall;
@@ -787,14 +816,8 @@ std::future<CallStatus> DBusConnection::sendDBusMessageWithReplyAsync(
DBusMessageReplyAsyncHandler* replyAsyncHandler = dbusMessageReplyAsyncHandler.release();
- std::future<CallStatus> callStatusFuture;
- try {
- callStatusFuture = replyAsyncHandler->getFuture();
- } catch (std::exception& e) {
- (void)e;
- }
-
PendingCallNotificationData* userData = new PendingCallNotificationData(this, replyAsyncHandler);
+ DBusTimeout::currentTimeout_ = NULL;
libdbusSuccess = dbus_connection_send_with_reply_set_notify(connection_,
dbusMessage.message_,
@@ -811,10 +834,10 @@ std::future<CallStatus> DBusConnection::sendDBusMessageWithReplyAsync(
if (!libdbusSuccess || !libdbusPendingCall) {
#ifdef _MSC_VER // Visual Studio
COMMONAPI_ERROR(std::string(__FUNCTION__) +
- ": (!libdbusSuccess || !libdbusPendingCall) == true")
+ ": (!libdbusSuccess || !libdbusPendingCall) == true");
#else
COMMONAPI_ERROR(std::string(__PRETTY_FUNCTION__) +
- ": (!libdbusSuccess || !libdbusPendingCall) == true")
+ ": (!libdbusSuccess || !libdbusPendingCall) == true");
#endif
if (libdbusPendingCall) {
dbus_pending_call_unref(libdbusPendingCall);
@@ -827,13 +850,13 @@ std::future<CallStatus> DBusConnection::sendDBusMessageWithReplyAsync(
nullptr));
}
mainLoopContext_.lock()->wakeup();
- return callStatusFuture;
+ return true;
}
if (_info->timeout_ != DBUS_TIMEOUT_INFINITE) {
- auto timeoutPoint = std::chrono::high_resolution_clock::now() + std::chrono::milliseconds(_info->timeout_);
+ auto timeoutPoint = (std::chrono::steady_clock::time_point) std::chrono::steady_clock::now() + std::chrono::milliseconds(_info->timeout_);
std::tuple<
- std::chrono::time_point<std::chrono::high_resolution_clock>,
+ std::chrono::steady_clock::time_point,
DBusMessageReplyAsyncHandler*,
DBusMessage> toInsert {
timeoutPoint,
@@ -841,11 +864,20 @@ std::future<CallStatus> DBusConnection::sendDBusMessageWithReplyAsync(
dbusMessage
};
+ if(DBusTimeout::currentTimeout_)
+ DBusTimeout::currentTimeout_->setPendingCall(libdbusPendingCall);
+
enforceTimeoutMutex_.lock();
auto ret = timeoutMap_.insert( { libdbusPendingCall, toInsert } );
if (ret.second == false) {
// key has been reused
// update the map value with the new info
+ DBusMessageReplyAsyncHandler* asyncHandler;
+ auto it = timeoutMap_.find(ret.first->first);
+ if(it != timeoutMap_.end()) {
+ asyncHandler = std::get<1>(it->second);
+ delete asyncHandler;
+ }
timeoutMap_.erase(ret.first);
timeoutMap_.insert( { libdbusPendingCall, toInsert } );
}
@@ -860,7 +892,7 @@ std::future<CallStatus> DBusConnection::sendDBusMessageWithReplyAsync(
timeoutInfiniteAsyncHandlers_.insert(replyAsyncHandler);
}
- return callStatusFuture;
+ return true;
}
DBusMessage DBusConnection::sendDBusMessageWithReplyAndBlock(const DBusMessage& dbusMessage,
@@ -902,17 +934,24 @@ void DBusConnection::dispatchDBusMessageReply(const DBusMessage& _reply,
}
bool DBusConnection::singleDispatch() {
+ std::list<MainloopTimeout_t> mainloopTimeouts;
{
- std::lock_guard<std::mutex> itsLock(mainloopTimeoutsMutex_);
+ mainloopTimeoutsMutex_.lock();
for (auto t : mainloopTimeouts_) {
- std::get<0>(t)->onDBusMessageReply(std::get<2>(t), std::get<1>(t));
- if (std::get<3>(t) != nullptr) {
- dbus_pending_call_unref(std::get<3>(t));
- }
- delete std::get<0>(t);
+ mainloopTimeouts.push_back(t);
}
mainloopTimeouts_.clear();
+ mainloopTimeoutsMutex_.unlock();
}
+
+ for (auto t : mainloopTimeouts) {
+ std::get<0>(t)->onDBusMessageReply(std::get<2>(t), std::get<1>(t));
+ if (std::get<3>(t) != nullptr) {
+ dbus_pending_call_unref(std::get<3>(t));
+ }
+ delete std::get<0>(t);
+ }
+
if(setDispatching(true)) {
bool dispatchStatus(connection_ && dbus_connection_dispatch(connection_) == DBUS_DISPATCH_DATA_REMAINS);
setDispatching(false);
@@ -948,10 +987,13 @@ void DBusConnection::incrementConnection() {
}
void DBusConnection::decrementConnection() {
- std::lock_guard < std::mutex > lock(activeConnectionsMutex_);
- activeConnections_--;
+ int activeConnections = 0;
+ {
+ std::lock_guard < std::mutex > lock(activeConnectionsMutex_);
+ activeConnections = --activeConnections_;
+ }
- if (activeConnections_ <= 0) {
+ if (activeConnections <= 0) {
disconnect();
}
}
@@ -978,19 +1020,25 @@ bool DBusConnection::setDispatching(bool _isDispatching) {
}
}
-void DBusConnection::sendPendingSelectiveSubscription(DBusProxy* callingProxy, std::string methodName,
- DBusSignalHandler* dbusSignalHandler, uint32_t tag) {
+void DBusConnection::sendPendingSelectiveSubscription(DBusProxy* callingProxy, std::string interfaceMemberName,
+ DBusSignalHandler* dbusSignalHandler, uint32_t tag, std::string interfaceMemberSignature) {
bool outarg;
+ std::string methodName = "subscribeFor" + interfaceMemberName + "Selective";
DBusProxyHelper<CommonAPI::DBus::DBusSerializableArguments<>,
CommonAPI::DBus::DBusSerializableArguments<bool>>::callMethodAsync(
*callingProxy, methodName.c_str(), "",
&CommonAPI::DBus::defaultCallInfo,
- [this, dbusSignalHandler, callingProxy, tag]
+ [this, dbusSignalHandler, callingProxy, tag, interfaceMemberName, interfaceMemberSignature]
(const CommonAPI::CallStatus& callStatus, const bool& accepted) {
if (callStatus == CommonAPI::CallStatus::SUCCESS && accepted) {
dbusSignalHandler->onSpecificError(CommonAPI::CallStatus::SUCCESS, tag);
} else {
+ const DBusSignalHandlerPath itsToken(callingProxy->getDBusAddress().getObjectPath(),
+ callingProxy->getDBusAddress().getInterface(),
+ interfaceMemberName,
+ interfaceMemberSignature);
+ removeSignalMemberHandler(itsToken, dbusSignalHandler);
dbusSignalHandler->onSpecificError(CommonAPI::CallStatus::SUBSCRIPTION_REFUSED, tag);
}
}, std::make_tuple(outarg));
@@ -1007,30 +1055,33 @@ void DBusConnection::subscribeForSelectiveBroadcast(
std::string methodName = "subscribeFor" + interfaceMemberName + "Selective";
+ DBusProxyConnection::DBusSignalHandlerToken token = addSignalMemberHandler(
+ objectPath,
+ interfaceName,
+ interfaceMemberName,
+ interfaceMemberSignature,
+ dbusSignalHandler,
+ true
+ );
+ dbusSignalHandler->setSubscriptionToken(token, tag);
+
bool outarg;
- DBusProxyHelper<CommonAPI::DBus::DBusSerializableArguments<>,
- CommonAPI::DBus::DBusSerializableArguments<bool>>::callMethodAsync(
- *callingProxy, methodName.c_str(), "",
- &CommonAPI::DBus::defaultCallInfo,
- [this, objectPath, interfaceName, interfaceMemberName, interfaceMemberSignature, dbusSignalHandler, callingProxy, tag]
- (const CommonAPI::CallStatus& callStatus, const bool& accepted) {
- if ((callStatus == CommonAPI::CallStatus::SUCCESS && accepted) || !callingProxy->isAvailable()) {
- DBusProxyConnection::DBusSignalHandlerToken token = addSignalMemberHandler(
- objectPath,
- interfaceName,
- interfaceMemberName,
- interfaceMemberSignature,
- dbusSignalHandler,
- true
- );
- dbusSignalHandler->setSubscriptionToken(token, tag);
- }
- if (accepted) {
- dbusSignalHandler->onSpecificError(CommonAPI::CallStatus::SUCCESS, tag);
- } else {
- dbusSignalHandler->onSpecificError(CommonAPI::CallStatus::SUBSCRIPTION_REFUSED, tag);
- }
- }, std::make_tuple(outarg));
+ if(callingProxy->isAvailable()) {
+ DBusProxyHelper<CommonAPI::DBus::DBusSerializableArguments<>,
+ CommonAPI::DBus::DBusSerializableArguments<bool>>::callMethodAsync(
+ *callingProxy, methodName.c_str(), "",
+ &CommonAPI::DBus::defaultCallInfo,
+ [this, objectPath, interfaceName, interfaceMemberName, interfaceMemberSignature, dbusSignalHandler, callingProxy, tag, token]
+ (const CommonAPI::CallStatus& callStatus, const bool& accepted) {
+ (void)callStatus;
+ if (accepted) {
+ dbusSignalHandler->onSpecificError(CommonAPI::CallStatus::SUCCESS, tag);
+ } else {
+ removeSignalMemberHandler(token, dbusSignalHandler);
+ dbusSignalHandler->onSpecificError(CommonAPI::CallStatus::SUBSCRIPTION_REFUSED, tag);
+ }
+ }, std::make_tuple(outarg));
+ }
}
void DBusConnection::unsubscribeFromSelectiveBroadcast(const std::string& eventName,
@@ -1585,6 +1636,9 @@ void notifyDBusOMSignalHandlers(DBusSignalHandlersTable& dbusSignalHandlerstable
signalEntry->second.first->lock();
signalGuard_.unlock();
+ // ensure, the registry survives
+ std::shared_ptr<DBusServiceRegistry> itsRegistry_ = DBusServiceRegistry::get(shared_from_this());
+
notifyDBusSignalHandlers(dbusSignalHandlerTable_,
signalEntry, dbusMessage, dbusHandlerResult);
@@ -1668,14 +1722,27 @@ std::shared_ptr<DBusConnection> DBusConnection::wrap(::DBusConnection *_connecti
return std::make_shared<DBusConnection>(_connection, _connectionId);
}
-void DBusConnection::pushDBusMessageReply(const DBusMessage& _reply,
+void DBusConnection::pushDBusMessageReplyToMainLoop(const DBusMessage& _reply,
std::unique_ptr<DBusMessageReplyAsyncHandler> _dbusMessageReplyAsyncHandler) {
// push message to the message queue
DBusMessageReplyAsyncHandler* replyAsyncHandler = _dbusMessageReplyAsyncHandler.release();
replyAsyncHandler->setHasToBeDeleted();
- std::shared_ptr<DBusMessageWatch::MsgReplyQueueEntry> msgReplyQueueEntry = std::make_shared<DBusMessageWatch::MsgReplyQueueEntry>(
+ std::shared_ptr<MsgReplyQueueEntry> msgReplyQueueEntry = std::make_shared<MsgReplyQueueEntry>(
replyAsyncHandler, _reply);
- msgWatch_->pushMsgQueue(msgReplyQueueEntry);
+ queueWatch_->pushQueue(msgReplyQueueEntry);
+}
+
+void DBusConnection::setPendingCallTimedOut(DBusPendingCall* _pendingCall, ::DBusTimeout* _timeout) const {
+ std::lock_guard<std::mutex> lock(enforceTimeoutMutex_);
+ auto it = timeoutMap_.find(_pendingCall);
+ if(it != timeoutMap_.end()) {
+ auto replyAsyncHandler = std::get<1>(it->second);
+ replyAsyncHandler->lock();
+ if(!replyAsyncHandler->getTimeoutOccurred()) {
+ dbus_timeout_handle(_timeout);
+ }
+ replyAsyncHandler->unlock();
+ }
}
} // namespace DBus
diff --git a/src/CommonAPI/DBus/DBusDaemonProxy.cpp b/src/CommonAPI/DBus/DBusDaemonProxy.cpp
index 58ee08f..3b5b29f 100644
--- a/src/CommonAPI/DBus/DBusDaemonProxy.cpp
+++ b/src/CommonAPI/DBus/DBusDaemonProxy.cpp
@@ -62,7 +62,15 @@ bool DBusDaemonProxy::isAvailableBlocking() const {
std::future<AvailabilityStatus> DBusDaemonProxy::isAvailableAsync(
isAvailableAsyncCallback _callback,
const CallInfo *_info) const {
- return isAvailableAsync(_callback, _info);
+ (void)_callback;
+ (void)_info;
+ std::promise<AvailabilityStatus> promise;
+ if(isAvailable()) {
+ promise.set_value(CommonAPI::AvailabilityStatus::AVAILABLE);
+ } else {
+ promise.set_value(CommonAPI::AvailabilityStatus::NOT_AVAILABLE);
+ }
+ return promise.get_future();
}
ProxyStatusEvent& DBusDaemonProxy::getProxyStatusEvent() {
diff --git a/src/CommonAPI/DBus/DBusFactory.cpp b/src/CommonAPI/DBus/DBusFactory.cpp
index d7fa84c..1c05455 100644
--- a/src/CommonAPI/DBus/DBusFactory.cpp
+++ b/src/CommonAPI/DBus/DBusFactory.cpp
@@ -47,7 +47,9 @@ Factory::init() {
void
Factory::registerInterface(InterfaceInitFunction _function) {
+#ifndef WIN32
std::lock_guard<std::mutex> itsLock(initializerMutex_);
+#endif
if (isInitialized_) {
// We are already running --> initialize the interface library!
_function();
@@ -79,11 +81,15 @@ Factory::createProxy(
DBusAddress dbusAddress;
if (DBusAddressTranslator::get()->translate(address, dbusAddress)) {
- std::shared_ptr<DBusProxy> proxy
- = proxyCreateFunctionsIterator->second(dbusAddress, getConnection(_connectionId));
- if (proxy)
- proxy->init();
- return proxy;
+ std::shared_ptr<DBusConnection> connection
+ = getConnection(_connectionId);
+ if (connection) {
+ std::shared_ptr<DBusProxy> proxy
+ = proxyCreateFunctionsIterator->second(dbusAddress, connection);
+ if (proxy)
+ proxy->init();
+ return proxy;
+ }
}
}
return nullptr;
@@ -100,11 +106,15 @@ Factory::createProxy(
DBusAddress dbusAddress;
if (DBusAddressTranslator::get()->translate(address, dbusAddress)) {
- std::shared_ptr<DBusProxy> proxy
- = proxyCreateFunctionsIterator->second(dbusAddress, getConnection(_context));
- if (proxy)
- proxy->init();
- return proxy;
+ std::shared_ptr<DBusConnection> connection
+ = getConnection(_context);
+ if (connection) {
+ std::shared_ptr<DBusProxy> proxy
+ = proxyCreateFunctionsIterator->second(dbusAddress, connection);
+ if (proxy)
+ proxy->init();
+ return proxy;
+ }
}
}
@@ -120,11 +130,15 @@ Factory::registerStub(
CommonAPI::Address address(_domain, _interface, _instance);
DBusAddress dbusAddress;
if (DBusAddressTranslator::get()->translate(address, dbusAddress)) {
- std::shared_ptr<DBusStubAdapter> adapter
- = stubAdapterCreateFunctionsIterator->second(dbusAddress, getConnection(_connectionId), _stub);
- if (adapter) {
- adapter->init(adapter);
- return registerStubAdapter(adapter);
+ std::shared_ptr<DBusConnection> connection = getConnection(_connectionId);
+ if (connection) {
+ std::shared_ptr<DBusStubAdapter> adapter
+ = stubAdapterCreateFunctionsIterator->second(dbusAddress, connection, _stub);
+ if (adapter) {
+ adapter->init(adapter);
+ if (registerStubAdapter(adapter))
+ return true;
+ }
}
}
}
@@ -141,11 +155,15 @@ Factory::registerStub(
CommonAPI::Address address(_domain, _interface, _instance);
DBusAddress dbusAddress;
if (DBusAddressTranslator::get()->translate(address, dbusAddress)) {
- std::shared_ptr<DBusStubAdapter> adapter
- = stubAdapterCreateFunctionsIterator->second(dbusAddress, getConnection(_context), _stub);
- if (adapter) {
- adapter->init(adapter);
- return registerStubAdapter(adapter);
+ std::shared_ptr<DBusConnection> connection = getConnection(_context);
+ if (connection) {
+ std::shared_ptr<DBusStubAdapter> adapter
+ = stubAdapterCreateFunctionsIterator->second(dbusAddress, connection, _stub);
+ if (adapter) {
+ adapter->init(adapter);
+ if (registerStubAdapter(adapter))
+ return true;
+ }
}
}
}
@@ -175,8 +193,10 @@ Factory::unregisterStub(const std::string &_domain, const std::string &_interfac
services_.erase(adapterResult->first);
+ decrementConnection(connection);
+
return true;
- }
+ }
return false;
}
@@ -245,20 +265,29 @@ Factory::unregisterStubAdapter(std::shared_ptr<DBusStubAdapter> _adapter) {
///////////////////////////////////////////////////////////////////////////////
std::shared_ptr<DBusConnection>
Factory::getConnection(const ConnectionId_t &_connectionId) {
-
- std::lock_guard<std::mutex> itsGuard(connectionsMutex_);
+ std::lock_guard<std::recursive_mutex> itsConnectionGuard(connectionsMutex_);
+ std::shared_ptr<DBusConnection> itsConnection;
auto itsConnectionIterator = connections_.find(_connectionId);
- if (itsConnectionIterator != connections_.end()) {
- return itsConnectionIterator->second;
+ if (itsConnectionIterator != connections_.end())
+ itsConnection = itsConnectionIterator->second;
+
+ if(!itsConnection) {
+ // No connection found, lets create and initialize one
+ const DBusType_t dbusType = DBusAddressTranslator::get()->getDBusBusType(_connectionId);
+ itsConnection = std::make_shared<DBusConnection>(dbusType, _connectionId);
+
+ if (itsConnection) {
+ if (!itsConnection->connect(true)) {
+ COMMONAPI_ERROR("Failed to create connection ", _connectionId);
+ itsConnection.reset();
+ } else {
+ connections_.insert({ _connectionId, itsConnection } );
+ }
+ }
}
- // No connection found, lets create and initialize one
- const DBusType_t dbusType = DBusAddressTranslator::get()->getDBusBusType(_connectionId);
- std::shared_ptr<DBusConnection> itsConnection
- = std::make_shared<DBusConnection>(dbusType, _connectionId);
- connections_.insert({ _connectionId, itsConnection });
+ incrementConnection(itsConnection);
- itsConnection->connect(true);
return itsConnection;
}
@@ -267,21 +296,29 @@ Factory::getConnection(std::shared_ptr<MainLoopContext> _context) {
if (!_context)
return getConnection(DEFAULT_CONNECTION_ID);
- std::lock_guard<std::mutex> itsGuard(contextConnectionsMutex_);
- auto itsConnectionIterator = contextConnections_.find(_context.get());
- if (itsConnectionIterator != contextConnections_.end()) {
- return itsConnectionIterator->second;
+ std::lock_guard<std::recursive_mutex> itsConnectionGuard(connectionsMutex_);
+ std::shared_ptr<DBusConnection> itsConnection;
+ auto itsConnectionIterator = connections_.find(_context->getName());
+ if (itsConnectionIterator != connections_.end())
+ itsConnection = itsConnectionIterator->second;
+
+ if(!itsConnection) {
+ // No connection found, lets create and initialize one
+ const DBusType_t dbusType = DBusAddressTranslator::get()->getDBusBusType(_context->getName());
+ itsConnection = std::make_shared<DBusConnection>(dbusType, _context->getName());
+
+ if (itsConnection) {
+ if (!itsConnection->connect(false)) {
+ itsConnection.reset();
+ } else {
+ connections_.insert({ _context->getName(), itsConnection } );
+ itsConnection->attachMainLoopContext(_context);
+ }
+ }
}
- // No connection found, lets create and initialize one
- const DBusType_t dbusType = DBusAddressTranslator::get()->getDBusBusType(_context->getName());
- std::shared_ptr<DBusConnection> itsConnection
- = std::make_shared<DBusConnection>(dbusType, _context->getName());
- contextConnections_.insert({ _context.get(), itsConnection } );
-
- itsConnection->connect(false);
- if (_context)
- itsConnection->attachMainLoopContext(_context);
+ if (itsConnection)
+ incrementConnection(itsConnection);
return itsConnection;
}
@@ -343,9 +380,11 @@ Factory::registerManagedService(const std::shared_ptr<DBusStubAdapter> &_stubAda
services_.erase(insertResult.first);
}
+ if(isAcquired)
+ incrementConnection(_stubAdapter->getDBusConnection());
+
return isAcquired;
}
-
return false;
}
@@ -370,90 +409,47 @@ Factory::unregisterManagedService(const ServicesMap::iterator &iterator) {
if (isUnregistered) {
connection->releaseServiceName(serviceName);
services_.erase(iterator);
+ decrementConnection(connection);
}
// TODO: log error
return isUnregistered;
}
void Factory::incrementConnection(std::shared_ptr<DBusProxyConnection> _connection) {
+ std::lock_guard<std::recursive_mutex> itsConnectionGuard(connectionsMutex_);
std::shared_ptr<DBusConnection> connection;
- {
- std::lock_guard<std::mutex> itsConnectionGuard(connectionsMutex_);
- for (auto itsConnectionIterator = connections_.begin(); itsConnectionIterator != connections_.end(); itsConnectionIterator++) {
- if (itsConnectionIterator->second == _connection) {
- connection = itsConnectionIterator->second;
- break;
- }
+ for (auto itsConnectionIterator = connections_.begin(); itsConnectionIterator != connections_.end(); itsConnectionIterator++) {
+ if (itsConnectionIterator->second == _connection) {
+ connection = itsConnectionIterator->second;
+ break;
}
}
if(connection)
connection->incrementConnection();
-
- std::shared_ptr<DBusConnection> contextConnection;
- {
- std::lock_guard<std::mutex> itsContextConnectionGuard(contextConnectionsMutex_);
- for (auto itsConnectionIterator = contextConnections_.begin(); itsConnectionIterator != contextConnections_.end(); itsConnectionIterator++) {
- if (itsConnectionIterator->second == _connection) {
- contextConnection = itsConnectionIterator->second;
- break;
- }
- }
- }
-
- if(contextConnection)
- contextConnection->incrementConnection();
}
void Factory::decrementConnection(std::shared_ptr<DBusProxyConnection> _connection) {
+ std::lock_guard<std::recursive_mutex> itsConnectionGuard(connectionsMutex_);
std::shared_ptr<DBusConnection> connection;
- {
- std::lock_guard<std::mutex> itsConnectionGuard(connectionsMutex_);
- for (auto itsConnectionIterator = connections_.begin(); itsConnectionIterator != connections_.end(); itsConnectionIterator++) {
- if (itsConnectionIterator->second == _connection) {
- connection = itsConnectionIterator->second;
- break;
- }
+ for (auto itsConnectionIterator = connections_.begin(); itsConnectionIterator != connections_.end(); itsConnectionIterator++) {
+ if (itsConnectionIterator->second == _connection) {
+ connection = itsConnectionIterator->second;
+ break;
}
}
if(connection)
connection->decrementConnection();
-
- std::shared_ptr<DBusConnection> contextConnection;
- {
- std::lock_guard<std::mutex> itsContextConnectionGuard(contextConnectionsMutex_);
- for (auto itsConnectionIterator = contextConnections_.begin(); itsConnectionIterator != contextConnections_.end(); itsConnectionIterator++) {
- if (itsConnectionIterator->second == _connection) {
- contextConnection = itsConnectionIterator->second;
- break;
- }
- }
- }
-
- if(contextConnection)
- contextConnection->decrementConnection();
}
-void Factory::releaseConnection(const ConnectionId_t& _connectionId, MainLoopContext* _mainloopContext) {
- {
- std::lock_guard<std::mutex> itsConnectionGuard(connectionsMutex_);
- auto connection = connections_.find(_connectionId);
+void Factory::releaseConnection(const ConnectionId_t& _connectionId) {
+ std::lock_guard<std::recursive_mutex> itsConnectionGuard(connectionsMutex_);
+ auto itsConnection = connections_.find(_connectionId);
- if (connection != connections_.end()) {
- DBusServiceRegistry::remove(connection->second);
- connections_.erase(_connectionId);
- }
- }
-
- {
- std::lock_guard<std::mutex> itsContextConnectionGuard(contextConnectionsMutex_);
- auto connectionContext = contextConnections_.find(_mainloopContext);
-
- if (connectionContext != contextConnections_.end()) {
- DBusServiceRegistry::remove(connectionContext->second);
- contextConnections_.erase(_mainloopContext);
- }
+ if (itsConnection != connections_.end()) {
+ DBusServiceRegistry::remove(itsConnection->second);
+ connections_.erase(_connectionId);
}
}
diff --git a/src/CommonAPI/DBus/DBusInstanceAvailabilityStatusChangedEvent.cpp b/src/CommonAPI/DBus/DBusInstanceAvailabilityStatusChangedEvent.cpp
new file mode 100644
index 0000000..4f93feb
--- /dev/null
+++ b/src/CommonAPI/DBus/DBusInstanceAvailabilityStatusChangedEvent.cpp
@@ -0,0 +1,269 @@
+// Copyright (C) 2013-2015 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+#include <CommonAPI/DBus/DBusInstanceAvailabilityStatusChangedEvent.hpp>
+
+#include <CommonAPI/DBus/DBusAddressTranslator.hpp>
+
+namespace CommonAPI {
+namespace DBus {
+
+DBusInstanceAvailabilityStatusChangedEvent::DBusInstanceAvailabilityStatusChangedEvent(
+ DBusProxy &_proxy,
+ const std::string &_dbusInterfaceName,
+ const std::string &_capiInterfaceName) :
+ proxy_(_proxy),
+ observedDbusInterfaceName_(_dbusInterfaceName),
+ observedCapiInterfaceName_(_capiInterfaceName),
+ registry_(DBusServiceRegistry::get(_proxy.getDBusConnection())) {
+}
+
+DBusInstanceAvailabilityStatusChangedEvent::~DBusInstanceAvailabilityStatusChangedEvent() {
+ proxy_.removeSignalMemberHandler(interfacesAddedSubscription_, this);
+ proxy_.removeSignalMemberHandler(interfacesRemovedSubscription_, this);
+}
+
+void DBusInstanceAvailabilityStatusChangedEvent::onSignalDBusMessage(const DBusMessage& dbusMessage) {
+ if (dbusMessage.hasMemberName("InterfacesAdded")) {
+ onInterfacesAddedSignal(dbusMessage);
+ } else if (dbusMessage.hasMemberName("InterfacesRemoved")) {
+ onInterfacesRemovedSignal(dbusMessage);
+ }
+}
+
+void DBusInstanceAvailabilityStatusChangedEvent::getAvailableServiceInstances(
+ CommonAPI::CallStatus &_status,
+ std::vector<DBusAddress> &_availableServiceInstances) {
+
+ _availableServiceInstances.clear();
+ DBusObjectManagerStub::DBusObjectPathAndInterfacesDict itsAvailableServiceInstances;
+ registry_->getAvailableServiceInstances(proxy_.getDBusAddress().getService(),
+ proxy_.getDBusAddress().getObjectPath(),
+ itsAvailableServiceInstances);
+
+ _status = CommonAPI::CallStatus::SUCCESS;
+ translate(itsAvailableServiceInstances, _availableServiceInstances);
+}
+
+std::future<CallStatus> DBusInstanceAvailabilityStatusChangedEvent::getAvailableServiceInstancesAsync(
+ GetAvailableServiceInstancesCallback _callback) {
+
+ std::shared_ptr<std::promise<CallStatus> > promise = std::make_shared<std::promise<CallStatus>>();
+ registry_->getAvailableServiceInstancesAsync(std::bind(
+ &DBusInstanceAvailabilityStatusChangedEvent::serviceInstancesAsyncCallback,
+ this,
+ proxy_.shared_from_this(),
+ std::placeholders::_1,
+ _callback,
+ promise),
+ proxy_.getDBusAddress().getService(),
+ proxy_.getDBusAddress().getObjectPath());
+ return promise->get_future();
+}
+
+void DBusInstanceAvailabilityStatusChangedEvent::getServiceInstanceAvailabilityStatus(
+ const std::string &_instance,
+ CallStatus &_callStatus,
+ AvailabilityStatus &_availabilityStatus) {
+
+ CommonAPI::Address itsAddress("local", observedCapiInterfaceName_, _instance);
+ DBusAddress itsDBusAddress;
+ DBusAddressTranslator::get()->translate(itsAddress, itsDBusAddress);
+
+ _availabilityStatus = AvailabilityStatus::NOT_AVAILABLE;
+ if (registry_->isServiceInstanceAlive(
+ itsDBusAddress.getInterface(),
+ itsDBusAddress.getService(),
+ itsDBusAddress.getObjectPath())) {
+ _availabilityStatus = AvailabilityStatus::AVAILABLE;
+ }
+ _callStatus = CallStatus::SUCCESS;
+}
+
+std::future<CallStatus> DBusInstanceAvailabilityStatusChangedEvent::getServiceInstanceAvailabilityStatusAsync(
+ const std::string& _instance,
+ ProxyManager::GetInstanceAvailabilityStatusCallback _callback) {
+
+ std::shared_ptr<std::promise<CallStatus> > promise = std::make_shared<std::promise<CallStatus>>();
+ auto proxy = proxy_.shared_from_this();
+ std::async(std::launch::async, [this, _instance, _callback, promise, proxy]() {
+ CallStatus callStatus;
+ AvailabilityStatus availabilityStatus;
+ getServiceInstanceAvailabilityStatus(_instance, callStatus, availabilityStatus);
+ _callback(callStatus, availabilityStatus);
+ promise->set_value(callStatus);
+ });
+
+ return promise->get_future();
+}
+
+void DBusInstanceAvailabilityStatusChangedEvent::onFirstListenerAdded(const Listener&) {
+
+ interfacesAddedSubscription_ = proxy_.addSignalMemberHandler(
+ proxy_.getDBusAddress().getObjectPath(),
+ DBusObjectManagerStub::getInterfaceName(),
+ "InterfacesAdded",
+ "oa{sa{sv}}",
+ this,
+ false);
+
+ interfacesRemovedSubscription_ = proxy_.addSignalMemberHandler(
+ proxy_.getDBusAddress().getObjectPath(),
+ DBusObjectManagerStub::getInterfaceName(),
+ "InterfacesRemoved",
+ "oas",
+ this,
+ false);
+
+ getAvailableServiceInstancesAsync([&](const CallStatus &_status,
+ const std::vector<DBusAddress> &_availableServices) {
+ if(_status == CallStatus::SUCCESS) {
+ for(auto service : _availableServices) {
+ if(service.getInterface() != observedDbusInterfaceName_)
+ continue;
+ if(addInterface(service.getObjectPath(), observedDbusInterfaceName_))
+ notifyInterfaceStatusChanged(service.getObjectPath(), observedDbusInterfaceName_, AvailabilityStatus::AVAILABLE);
+ }
+ }
+ });
+}
+
+void DBusInstanceAvailabilityStatusChangedEvent::onLastListenerRemoved(const Listener&) {
+ proxy_.removeSignalMemberHandler(interfacesAddedSubscription_, this);
+ proxy_.removeSignalMemberHandler(interfacesRemovedSubscription_, this);
+}
+
+void DBusInstanceAvailabilityStatusChangedEvent::onInterfacesAddedSignal(const DBusMessage &_message) {
+ DBusInputStream dbusInputStream(_message);
+ std::string dbusObjectPath;
+ std::string dbusInterfaceName;
+ DBusInterfacesAndPropertiesDict dbusInterfacesAndPropertiesDict;
+
+ dbusInputStream >> dbusObjectPath;
+ if (dbusInputStream.hasError()) {
+ COMMONAPI_ERROR(std::string(__FUNCTION__) + " failed to read object path");
+ }
+
+ dbusInputStream.beginReadMapOfSerializableStructs();
+ while (!dbusInputStream.readMapCompleted()) {
+ dbusInputStream.align(8);
+ dbusInputStream >> dbusInterfaceName;
+ dbusInputStream.skipMap();
+ if (dbusInputStream.hasError()) {
+ COMMONAPI_ERROR(std::string(__FUNCTION__) + " failed to read interface name");
+ }
+ if(dbusInterfaceName == observedDbusInterfaceName_ && addInterface(dbusObjectPath, dbusInterfaceName)) {
+ notifyInterfaceStatusChanged(dbusObjectPath, dbusInterfaceName, AvailabilityStatus::AVAILABLE);
+ }
+ }
+ dbusInputStream.endReadMapOfSerializableStructs();
+}
+
+void DBusInstanceAvailabilityStatusChangedEvent::onInterfacesRemovedSignal(const DBusMessage &_message) {
+ DBusInputStream dbusInputStream(_message);
+ std::string dbusObjectPath;
+ std::vector<std::string> dbusInterfaceNames;
+
+ dbusInputStream >> dbusObjectPath;
+ if (dbusInputStream.hasError()) {
+ COMMONAPI_ERROR(std::string(__FUNCTION__) + " failed to read object path");
+ }
+
+ dbusInputStream >> dbusInterfaceNames;
+ if (dbusInputStream.hasError()) {
+ COMMONAPI_ERROR(std::string(__FUNCTION__) + " failed to read interface names");
+ }
+
+ for (const auto& dbusInterfaceName : dbusInterfaceNames) {
+ if(dbusInterfaceName == observedDbusInterfaceName_ && removeInterface(dbusObjectPath, dbusInterfaceName)) {
+ notifyInterfaceStatusChanged(dbusObjectPath, dbusInterfaceName, AvailabilityStatus::NOT_AVAILABLE);
+ }
+ }
+}
+
+void DBusInstanceAvailabilityStatusChangedEvent::notifyInterfaceStatusChanged(
+ const std::string &_objectPath,
+ const std::string &_interfaceName,
+ const AvailabilityStatus &_availability) {
+ CommonAPI::Address itsAddress;
+ DBusAddress itsDBusAddress(proxy_.getDBusAddress().getService(),
+ _objectPath,
+ _interfaceName);
+
+ DBusAddressTranslator::get()->translate(itsDBusAddress, itsAddress);
+
+ // ensure, the proxy and the event survives until notification is done
+ auto itsProxy = proxy_.shared_from_this();
+
+ notifyListeners(itsAddress.getAddress(), _availability);
+}
+
+bool DBusInstanceAvailabilityStatusChangedEvent::addInterface(
+ const std::string &_dbusObjectPath,
+ const std::string &_dbusInterfaceName) {
+ std::lock_guard<std::mutex> lock(interfacesMutex_);
+ auto it = interfaces_.find(_dbusObjectPath);
+ if (it == interfaces_.end()) {
+ std::set<std::string> itsInterfaces;
+ itsInterfaces.insert(_dbusInterfaceName);
+ interfaces_[_dbusObjectPath] = itsInterfaces;
+ return true;
+ } else {
+ if(it->second.insert(_dbusInterfaceName).second)
+ return true;
+ }
+ return false;
+}
+
+bool DBusInstanceAvailabilityStatusChangedEvent::removeInterface(
+ const std::string &_dbusObjectPath,
+ const std::string &_dbusInterfaceName) {
+ std::lock_guard<std::mutex> lock(interfacesMutex_);
+ auto it = interfaces_.find(_dbusObjectPath);
+ if(it != interfaces_.end()) {
+ if(it->second.erase(_dbusInterfaceName) > 0)
+ return true;
+ }
+ return false;
+}
+
+void DBusInstanceAvailabilityStatusChangedEvent::serviceInstancesAsyncCallback(
+ std::shared_ptr<Proxy> _proxy,
+ const DBusObjectManagerStub::DBusObjectPathAndInterfacesDict _dict,
+ GetAvailableServiceInstancesCallback &_call,
+ std::shared_ptr<std::promise<CallStatus> > &_promise) {
+ (void)_proxy;
+ std::vector<DBusAddress> result;
+ translate(_dict, result);
+ _call(CallStatus::SUCCESS, result);
+ _promise->set_value(CallStatus::SUCCESS);
+}
+
+void DBusInstanceAvailabilityStatusChangedEvent::translate(
+ const DBusObjectManagerStub::DBusObjectPathAndInterfacesDict &_dict,
+ std::vector<DBusAddress> &_serviceInstances) {
+ DBusAddress itsDBusAddress;
+
+ const std::string &itsService = proxy_.getDBusAddress().getService();
+ itsDBusAddress.setService(itsService);
+
+ for (const auto &objectPathIter : _dict) {
+ itsDBusAddress.setObjectPath(objectPathIter.first);
+
+ const auto &interfacesDict = objectPathIter.second;
+ for (const auto &interfaceIter : interfacesDict) {
+
+ if (interfaceIter.first == observedDbusInterfaceName_) {
+ itsDBusAddress.setInterface(interfaceIter.first);
+ _serviceInstances.push_back(itsDBusAddress);
+ }
+ }
+ }
+}
+
+} // namespace DBus
+} // namespace CommonAPI
+
+
diff --git a/src/CommonAPI/DBus/DBusMainLoopContext.cpp b/src/CommonAPI/DBus/DBusMainLoopContext.cpp
index 5c8afa5..428807f 100644
--- a/src/CommonAPI/DBus/DBusMainLoopContext.cpp
+++ b/src/CommonAPI/DBus/DBusMainLoopContext.cpp
@@ -41,36 +41,36 @@ bool DBusDispatchSource::dispatch() {
return dbusConnection_->singleDispatch();
}
-DBusMessageDispatchSource::DBusMessageDispatchSource(DBusMessageWatch* watch) :
+DBusQueueDispatchSource::DBusQueueDispatchSource(DBusQueueWatch* watch) :
watch_(watch) {
watch_->addDependentDispatchSource(this);
}
-DBusMessageDispatchSource::~DBusMessageDispatchSource() {
+DBusQueueDispatchSource::~DBusQueueDispatchSource() {
std::unique_lock<std::mutex> itsLock(watchMutex_);
watch_->removeDependentDispatchSource(this);
}
-bool DBusMessageDispatchSource::prepare(int64_t& timeout) {
+bool DBusQueueDispatchSource::prepare(int64_t& timeout) {
std::unique_lock<std::mutex> itsLock(watchMutex_);
timeout = -1;
- return !watch_->emptyMsgQueue();
+ return !watch_->emptyQueue();
}
-bool DBusMessageDispatchSource::check() {
+bool DBusQueueDispatchSource::check() {
std::unique_lock<std::mutex> itsLock(watchMutex_);
- return !watch_->emptyMsgQueue();
+ return !watch_->emptyQueue();
}
-bool DBusMessageDispatchSource::dispatch() {
+bool DBusQueueDispatchSource::dispatch() {
std::unique_lock<std::mutex> itsLock(watchMutex_);
- if (!watch_->emptyMsgQueue()) {
- auto queueEntry = watch_->frontMsgQueue();
- watch_->popMsgQueue();
- watch_->processMsgQueueEntry(queueEntry);
+ if (!watch_->emptyQueue()) {
+ auto queueEntry = watch_->frontQueue();
+ watch_->popQueue();
+ watch_->processQueueEntry(queueEntry);
}
- return !watch_->emptyMsgQueue();
+ return !watch_->emptyQueue();
}
DBusWatch::DBusWatch(::DBusWatch* libdbusWatch, std::weak_ptr<MainLoopContext>& mainLoopContext,
@@ -179,19 +179,7 @@ void DBusWatch::addDependentDispatchSource(DispatchSource* dispatchSource) {
dependentDispatchSources_.push_back(dispatchSource);
}
-void DBusMessageWatch::MsgReplyQueueEntry::process(std::shared_ptr<DBusConnection> _connection) {
- _connection->dispatchDBusMessageReply(message_, replyAsyncHandler_);
-}
-
-void DBusMessageWatch::MsgReplyQueueEntry::clear() {
- delete replyAsyncHandler_;
-}
-
-void DBusMessageWatch::MsgQueueEntry::clear() {
-
-}
-
-DBusMessageWatch::DBusMessageWatch(std::shared_ptr<DBusConnection> _connection) : pipeValue_(4) {
+DBusQueueWatch::DBusQueueWatch(std::shared_ptr<DBusConnection> _connection) : pipeValue_(4) {
#ifdef WIN32
std::string pipeName = "\\\\.\\pipe\\CommonAPI-DBus-";
@@ -280,7 +268,7 @@ DBusMessageWatch::DBusMessageWatch(std::shared_ptr<DBusConnection> _connection)
connection_ = _connection;
}
-DBusMessageWatch::~DBusMessageWatch() {
+DBusQueueWatch::~DBusQueueWatch() {
#ifdef WIN32
BOOL retVal = DisconnectNamedPipe((HANDLE)pipeFileDescriptors_[0]);
@@ -304,36 +292,36 @@ DBusMessageWatch::~DBusMessageWatch() {
close(pipeFileDescriptors_[1]);
#endif
- std::unique_lock<std::mutex> itsLock(msgQueueMutex_);
- while(!msgQueue_.empty()) {
- auto queueEntry = msgQueue_.front();
- msgQueue_.pop();
+ std::unique_lock<std::mutex> itsLock(queueMutex_);
+ while(!queue_.empty()) {
+ auto queueEntry = queue_.front();
+ queue_.pop();
queueEntry->clear();
}
}
-void DBusMessageWatch::dispatch(unsigned int) {
+void DBusQueueWatch::dispatch(unsigned int) {
}
-const pollfd& DBusMessageWatch::getAssociatedFileDescriptor() {
+const pollfd& DBusQueueWatch::getAssociatedFileDescriptor() {
return pollFileDescriptor_;
}
#ifdef WIN32
-const HANDLE& DBusMessageWatch::getAssociatedEvent() {
+const HANDLE& DBusQueueWatch::getAssociatedEvent() {
return wsaEvent_;
}
#endif
-const std::vector<DispatchSource*>& DBusMessageWatch::getDependentDispatchSources() {
+const std::vector<DispatchSource*>& DBusQueueWatch::getDependentDispatchSources() {
return dependentDispatchSources_;
}
-void DBusMessageWatch::addDependentDispatchSource(CommonAPI::DispatchSource* _dispatchSource) {
+void DBusQueueWatch::addDependentDispatchSource(CommonAPI::DispatchSource* _dispatchSource) {
dependentDispatchSources_.push_back(_dispatchSource);
}
-void DBusMessageWatch::removeDependentDispatchSource(CommonAPI::DispatchSource* _dispatchSource) {
+void DBusQueueWatch::removeDependentDispatchSource(CommonAPI::DispatchSource* _dispatchSource) {
std::vector<CommonAPI::DispatchSource*>::iterator it;
for (it = dependentDispatchSources_.begin(); it != dependentDispatchSources_.end(); it++) {
@@ -344,9 +332,9 @@ void DBusMessageWatch::removeDependentDispatchSource(CommonAPI::DispatchSource*
}
}
-void DBusMessageWatch::pushMsgQueue(std::shared_ptr<MsgQueueEntry> _queueEntry) {
- std::unique_lock<std::mutex> itsLock(msgQueueMutex_);
- msgQueue_.push(_queueEntry);
+void DBusQueueWatch::pushQueue(std::shared_ptr<QueueEntry> _queueEntry) {
+ std::unique_lock<std::mutex> itsLock(queueMutex_);
+ queue_.push(_queueEntry);
#ifdef WIN32
char writeValue[sizeof(pipeValue_)];
@@ -371,8 +359,8 @@ void DBusMessageWatch::pushMsgQueue(std::shared_ptr<MsgQueueEntry> _queueEntry)
#endif
}
-void DBusMessageWatch::popMsgQueue() {
- std::unique_lock<std::mutex> itsLock(msgQueueMutex_);
+void DBusQueueWatch::popQueue() {
+ std::unique_lock<std::mutex> itsLock(queueMutex_);
#ifdef WIN32
char readValue[sizeof(pipeValue_)];
@@ -396,32 +384,42 @@ void DBusMessageWatch::popMsgQueue() {
}
#endif
- msgQueue_.pop();
+ queue_.pop();
}
-std::shared_ptr<DBusMessageWatch::MsgQueueEntry> DBusMessageWatch::frontMsgQueue() {
- std::unique_lock<std::mutex> itsLock(msgQueueMutex_);
+std::shared_ptr<QueueEntry> DBusQueueWatch::frontQueue() {
+ std::unique_lock<std::mutex> itsLock(queueMutex_);
- return msgQueue_.front();
+ return queue_.front();
}
-bool DBusMessageWatch::emptyMsgQueue() {
- std::unique_lock<std::mutex> itsLock(msgQueueMutex_);
+bool DBusQueueWatch::emptyQueue() {
+ std::unique_lock<std::mutex> itsLock(queueMutex_);
- return msgQueue_.empty();
+ return queue_.empty();
}
-void DBusMessageWatch::processMsgQueueEntry(std::shared_ptr<DBusMessageWatch::MsgQueueEntry> _queueEntry) {
+void DBusQueueWatch::processQueueEntry(std::shared_ptr<QueueEntry> _queueEntry) {
std::shared_ptr<DBusConnection> itsConnection = connection_.lock();
if(itsConnection) {
_queueEntry->process(itsConnection);
}
}
-DBusTimeout::DBusTimeout(::DBusTimeout* libdbusTimeout, std::weak_ptr<MainLoopContext>& mainLoopContext) :
+#ifdef WIN32
+__declspec(thread) DBusTimeout* DBusTimeout::currentTimeout_ = NULL;
+#else
+thread_local DBusTimeout* DBusTimeout::currentTimeout_ = NULL;
+#endif
+
+DBusTimeout::DBusTimeout(::DBusTimeout* libdbusTimeout, std::weak_ptr<MainLoopContext>& mainLoopContext,
+ std::weak_ptr<DBusConnection>& dbusConnection) :
dueTimeInMs_(TIMEOUT_INFINITE),
libdbusTimeout_(libdbusTimeout),
- mainLoopContext_(mainLoopContext) {
+ mainLoopContext_(mainLoopContext),
+ dbusConnection_(dbusConnection),
+ pendingCall_(NULL) {
+ currentTimeout_ = this;
}
bool DBusTimeout::isReadyToBeMonitored() {
@@ -447,9 +445,16 @@ void DBusTimeout::stopMonitoring() {
}
bool DBusTimeout::dispatch() {
- recalculateDueTime();
- dbus_timeout_handle(libdbusTimeout_);
- return true;
+ std::shared_ptr<DBusConnection> itsConnection = dbusConnection_.lock();
+ if(itsConnection) {
+ if(itsConnection->setDispatching(true)) {
+ recalculateDueTime();
+ itsConnection->setPendingCallTimedOut(pendingCall_, libdbusTimeout_);
+ itsConnection->setDispatching(false);
+ return true;
+ }
+ }
+ return false;
}
int64_t DBusTimeout::getTimeoutInterval() const {
@@ -469,5 +474,9 @@ void DBusTimeout::recalculateDueTime() {
}
}
+void DBusTimeout::setPendingCall(DBusPendingCall* _pendingCall) {
+ pendingCall_ = _pendingCall;
+}
+
} // namespace DBus
} // namespace CommonAPI
diff --git a/src/CommonAPI/DBus/DBusOutputStream.cpp b/src/CommonAPI/DBus/DBusOutputStream.cpp
index 6fd5d0a..564aa61 100644
--- a/src/CommonAPI/DBus/DBusOutputStream.cpp
+++ b/src/CommonAPI/DBus/DBusOutputStream.cpp
@@ -73,7 +73,7 @@ DBusOutputStream& DBusOutputStream::writeString(const char *_value, const uint32
if (NULL == _value) {
COMMONAPI_ERROR(std::string(__FUNCTION__) + " _value == NULL");
} else if (_value[_length] != '\0') {
- COMMONAPI_ERROR(std::string(__FUNCTION__) + " _value is not zero-terminated")
+ COMMONAPI_ERROR(std::string(__FUNCTION__) + " _value is not zero-terminated");
} else {
_writeValue(_length);
_writeRaw(_value, _length + 1);
@@ -84,7 +84,12 @@ DBusOutputStream& DBusOutputStream::writeString(const char *_value, const uint32
DBusOutputStream& DBusOutputStream::writeByteBuffer(const uint8_t *_value,
const uint32_t &_length) {
if (NULL == _value) {
- COMMONAPI_ERROR(std::string(__FUNCTION__) + " _value == NULL");
+ if (0 != _length) {
+ COMMONAPI_ERROR(std::string(__FUNCTION__) + " _value == NULL && _length != 0");
+ } else {
+ COMMONAPI_WARNING(std::string(__FUNCTION__) + " _value == NULL");
+ _writeValue(_length);
+ }
} else {
_writeValue(_length);
_writeRaw(reinterpret_cast<const char*>(_value), _length);
diff --git a/src/CommonAPI/DBus/DBusProxy.cpp b/src/CommonAPI/DBus/DBusProxy.cpp
index 95c3d59..655fa6c 100644
--- a/src/CommonAPI/DBus/DBusProxy.cpp
+++ b/src/CommonAPI/DBus/DBusProxy.cpp
@@ -9,6 +9,7 @@
#include <CommonAPI/DBus/DBusProxy.hpp>
#include <CommonAPI/DBus/DBusUtils.hpp>
#include <CommonAPI/DBus/DBusProxyAsyncSignalMemberCallbackHandler.hpp>
+#include <CommonAPI/DBus/DBusConnection.hpp>
#include <CommonAPI/Logger.hpp>
namespace CommonAPI {
@@ -18,10 +19,36 @@ DBusProxyStatusEvent::DBusProxyStatusEvent(DBusProxy *_dbusProxy)
: dbusProxy_(_dbusProxy) {
}
-void DBusProxyStatusEvent::onListenerAdded(const Listener &_listener, const Subscription _subscription) {
- (void)_subscription;
- if (dbusProxy_->isAvailable())
- _listener(AvailabilityStatus::AVAILABLE);
+void DBusProxyStatusEvent::onListenerAdded(const Listener &_listener,
+ const Subscription _subscription) {
+ std::lock_guard<std::recursive_mutex> listenersLock(listenersMutex_);
+
+ //notify listener about availability status -> push function to mainloop
+ std::weak_ptr<DBusProxy> itsdbusProxy = dbusProxy_->shared_from_this();
+ std::function<void(std::weak_ptr<DBusProxy>, Listener, Subscription)> notifySpecificListenerHandler =
+ std::bind(&DBusProxy::notifySpecificListener,
+ dbusProxy_,
+ std::placeholders::_1,
+ std::placeholders::_2,
+ std::placeholders::_3);
+ dbusProxy_->getDBusConnection()->proxyPushFunctionToMainLoop<DBusConnection>(
+ notifySpecificListenerHandler,
+ itsdbusProxy,
+ _listener,
+ _subscription);
+}
+
+void DBusProxyStatusEvent::onListenerRemoved(const Listener& _listener,
+ const Subscription _subscription) {
+ std::lock_guard<std::recursive_mutex> listenersLock(listenersMutex_);
+ (void)_listener;
+ auto listenerIt = listeners_.begin();
+ while(listenerIt != listeners_.end()) {
+ if(listenerIt->first == _subscription)
+ listenerIt = listeners_.erase(listenerIt);
+ else
+ ++listenerIt;
+ }
}
void DBusProxy::availabilityTimeoutThreadHandler() const {
@@ -36,7 +63,7 @@ void DBusProxy::availabilityTimeoutThreadHandler() const {
isAvailableAsyncCallback,
std::promise<AvailabilityStatus>,
AvailabilityStatus,
- std::chrono::time_point<std::chrono::high_resolution_clock>
+ std::chrono::steady_clock::time_point
> CallbackData_t;
std::list<CallbackData_t> callbacks;
@@ -46,14 +73,14 @@ void DBusProxy::availabilityTimeoutThreadHandler() const {
timeoutsMutex_.lock();
int timeout = std::numeric_limits<int>::max();
- std::chrono::time_point<std::chrono::high_resolution_clock> minTimeout;
+ std::chrono::steady_clock::time_point minTimeout;
if (timeouts_.size() > 0) {
auto minTimeoutElement = std::min_element(timeouts_.begin(), timeouts_.end(),
[] (const AvailabilityTimeout_t& lhs, const AvailabilityTimeout_t& rhs) {
return std::get<0>(lhs) < std::get<0>(rhs);
});
minTimeout = std::get<0>(*minTimeoutElement);
- std::chrono::high_resolution_clock::time_point now = std::chrono::high_resolution_clock::now();
+ std::chrono::steady_clock::time_point now = (std::chrono::steady_clock::time_point) std::chrono::steady_clock::now();
timeout = (int)std::chrono::duration_cast<std::chrono::milliseconds>(minTimeout - now).count();
}
timeoutsMutex_.unlock();
@@ -66,21 +93,22 @@ void DBusProxy::availabilityTimeoutThreadHandler() const {
//iterate through timeouts
auto it = timeouts_.begin();
while (it != timeouts_.end()) {
- std::chrono::high_resolution_clock::time_point now = std::chrono::high_resolution_clock::now();
+ std::chrono::steady_clock::time_point now = (std::chrono::steady_clock::time_point) std::chrono::steady_clock::now();
isAvailableAsyncCallback callback = std::get<1>(*it);
if (now > std::get<0>(*it)) {
//timeout
availabilityMutex_.lock();
+ std::chrono::steady_clock::time_point timepoint_;
if(isAvailable())
callbacks.push_back(std::make_tuple(callback, std::move(std::get<2>(*it)),
AvailabilityStatus::AVAILABLE,
- std::chrono::time_point<std::chrono::high_resolution_clock>()));
+ timepoint_));
else
callbacks.push_back(std::make_tuple(callback, std::move(std::get<2>(*it)),
AvailabilityStatus::NOT_AVAILABLE,
- std::chrono::time_point<std::chrono::high_resolution_clock>()));
+ timepoint_));
it = timeouts_.erase(it);
availabilityMutex_.unlock();
} else {
@@ -131,7 +159,7 @@ void DBusProxy::availabilityTimeoutThreadHandler() const {
isAvailableAsyncCallback callback;
AvailabilityStatus avStatus;
int remainingTimeout;
- std::chrono::high_resolution_clock::time_point now;
+ std::chrono::steady_clock::time_point now;
auto it = callbacks.begin();
while(it != callbacks.end()) {
@@ -139,7 +167,7 @@ void DBusProxy::availabilityTimeoutThreadHandler() const {
avStatus = std::get<2>(*it);
// compute remaining timeout
- now = std::chrono::high_resolution_clock::now();
+ now = (std::chrono::steady_clock::time_point) std::chrono::steady_clock::now();
remainingTimeout = (int)std::chrono::duration_cast<std::chrono::milliseconds>(std::get<3>(*it) - now).count();
if(remainingTimeout < 0)
remainingTimeout = 0;
@@ -170,10 +198,10 @@ DBusProxy::DBusProxy(const DBusAddress &_dbusAddress,
interfaceVersionAttribute_(*this, "uu", "getInterfaceVersion"),
dbusServiceRegistry_(DBusServiceRegistry::get(_connection))
{
- Factory::get()->incrementConnection(connection_);
}
void DBusProxy::init() {
+ selfReference_ = shared_from_this();
dbusServiceRegistrySubscription_ = dbusServiceRegistry_->subscribeAvailabilityListener(
getAddress().getAddress(),
std::bind(&DBusProxy::onDBusServiceInstanceStatus, this, std::placeholders::_1));
@@ -216,7 +244,7 @@ std::future<AvailabilityStatus> DBusProxy::isAvailableAsync(
std::future<AvailabilityStatus> future = promise.get_future();
//set timeout point
- auto timeoutPoint = std::chrono::high_resolution_clock::now() + std::chrono::milliseconds(_info->timeout_);
+ auto timeoutPoint = (std::chrono::steady_clock::time_point) std::chrono::steady_clock::now() + std::chrono::milliseconds(_info->timeout_);
timeoutsMutex_.lock();
if(timeouts_.size() == 0) {
@@ -258,6 +286,10 @@ std::future<AvailabilityStatus> DBusProxy::isAvailableAsync(
return future;
}
+AvailabilityStatus DBusProxy::getAvailabilityStatus() const {
+ return availabilityStatus_;
+}
+
ProxyStatusEvent& DBusProxy::getProxyStatusEvent() {
return dbusProxyStatusEvent_;
}
@@ -286,7 +318,24 @@ void DBusProxy::signalInitialValueCallback(const CallStatus _status,
}
}
+void DBusProxy::notifySpecificListener(std::weak_ptr<DBusProxy> _dbusProxy,
+ const ProxyStatusEvent::Listener &_listener,
+ const ProxyStatusEvent::Subscription _subscription) {
+ if(_dbusProxy.lock()) {
+ std::lock_guard<std::recursive_mutex> listenersLock(dbusProxyStatusEvent_.listenersMutex_);
+
+ AvailabilityStatus itsStatus = availabilityStatus_;
+ if (itsStatus != AvailabilityStatus::UNKNOWN)
+ dbusProxyStatusEvent_.notifySpecificListener(_subscription, itsStatus);
+
+ //add listener to list so that it can be notified about a change of availability
+ dbusProxyStatusEvent_.listeners_.push_back(std::make_pair(_subscription, _listener));
+ }
+}
+
void DBusProxy::onDBusServiceInstanceStatus(const AvailabilityStatus& availabilityStatus) {
+ //ensure, proxy survives until notification is done
+ auto itsSelf = selfReference_.lock();
if (availabilityStatus != availabilityStatus_) {
availabilityMutex_.lock();
availabilityStatus_ = availabilityStatus;
@@ -297,7 +346,11 @@ void DBusProxy::onDBusServiceInstanceStatus(const AvailabilityStatus& availabili
availabilityTimeoutCondition_.notify_all();
availabilityTimeoutThreadMutex_.unlock();
- dbusProxyStatusEvent_.notifyListeners(availabilityStatus);
+ {
+ std::lock_guard<std::recursive_mutex> subscribersLock(dbusProxyStatusEvent_.listenersMutex_);
+ for(auto listenerIt : dbusProxyStatusEvent_.listeners_)
+ dbusProxyStatusEvent_.notifySpecificListener(listenerIt.first, availabilityStatus_);
+ }
if (availabilityStatus == AvailabilityStatus::AVAILABLE) {
std::lock_guard < std::mutex > queueLock(signalMemberHandlerQueueMutex_);
@@ -335,9 +388,11 @@ void DBusProxy::onDBusServiceInstanceStatus(const AvailabilityStatus& availabili
{
std::lock_guard < std::mutex > queueLock(selectiveBroadcastHandlersMutex_);
for (auto selectiveBroadcasts : selectiveBroadcastHandlers) {
- std::string methodName = "subscribeFor" + selectiveBroadcasts.first + "Selective";
- connection_->sendPendingSelectiveSubscription(this, methodName, selectiveBroadcasts.second.first,
- selectiveBroadcasts.second.second);
+ connection_->sendPendingSelectiveSubscription(this,
+ selectiveBroadcasts.first,
+ std::get<0>(selectiveBroadcasts.second),
+ std::get<1>(selectiveBroadcasts.second),
+ std::get<2>(selectiveBroadcasts.second));
}
}
} else {
@@ -365,9 +420,11 @@ void DBusProxy::onDBusServiceInstanceStatus(const AvailabilityStatus& availabili
}
void DBusProxy::insertSelectiveSubscription(const std::string& interfaceMemberName,
- DBusProxyConnection::DBusSignalHandler* dbusSignalHandler, uint32_t tag) {
+ DBusProxyConnection::DBusSignalHandler* dbusSignalHandler, uint32_t tag,
+ std::string interfaceMemberSignature) {
std::lock_guard < std::mutex > queueLock(selectiveBroadcastHandlersMutex_);
- selectiveBroadcastHandlers[interfaceMemberName] = std::make_pair(dbusSignalHandler, tag);
+ selectiveBroadcastHandlers[interfaceMemberName] = std::make_tuple(
+ dbusSignalHandler, tag, interfaceMemberSignature);
}
void DBusProxy::subscribeForSelectiveBroadcastOnConnection(
@@ -585,6 +642,5 @@ void DBusProxy::freeDesktopGetCurrentValueForSignalListener(
}
}
-
} // namespace DBus
} // namespace CommonAPI
diff --git a/src/CommonAPI/DBus/DBusProxyManager.cpp b/src/CommonAPI/DBus/DBusProxyManager.cpp
index e36f6e1..cedd58b 100644
--- a/src/CommonAPI/DBus/DBusProxyManager.cpp
+++ b/src/CommonAPI/DBus/DBusProxyManager.cpp
@@ -12,11 +12,12 @@ namespace DBus {
DBusProxyManager::DBusProxyManager(
DBusProxy &_proxy,
- const std::string &_interfaceId)
+ const std::string &_dbusInterfaceId,
+ const std::string &_capiInterfaceId)
: proxy_(_proxy),
- instanceAvailabilityStatusEvent_(_proxy, _interfaceId),
- interfaceId_(_interfaceId),
- registry_(DBusServiceRegistry::get(_proxy.getDBusConnection()))
+ instanceAvailabilityStatusEvent_(_proxy, _dbusInterfaceId, _capiInterfaceId),
+ dbusInterfaceId_(_dbusInterfaceId),
+ capiInterfaceId_(_capiInterfaceId)
{
}
@@ -28,7 +29,7 @@ DBusProxyManager::getDomain() const {
const std::string &
DBusProxyManager::getInterface() const {
- return interfaceId_;
+ return capiInterfaceId_;
}
const ConnectionId_t &
@@ -40,14 +41,16 @@ DBusProxyManager::getConnectionId() const {
void
DBusProxyManager::instancesAsyncCallback(
+ std::shared_ptr<Proxy> _proxy,
const CommonAPI::CallStatus &_status,
- const DBusObjectManagerStub::DBusObjectPathAndInterfacesDict &_dict,
+ const std::vector<DBusAddress> &_availableServiceInstances,
GetAvailableInstancesCallback &_call) {
- std::vector<std::string> result;
+ (void)_proxy;
+ std::vector<std::string> itsAvailableInstances;
if (_status == CommonAPI::CallStatus::SUCCESS) {
- translateCommonApiAddresses(_dict, result);
+ translate(_availableServiceInstances, itsAvailableInstances);
}
- _call(_status, result);
+ _call(_status, itsAvailableInstances);
}
void
@@ -55,98 +58,38 @@ DBusProxyManager::getAvailableInstances(
CommonAPI::CallStatus &_status,
std::vector<std::string> &_availableInstances) {
_availableInstances.clear();
- DBusObjectManagerStub::DBusObjectPathAndInterfacesDict dict;
-
- DBusProxyHelper<
- DBusSerializableArguments<>,
- DBusSerializableArguments<
- DBusObjectManagerStub::DBusObjectPathAndInterfacesDict
- >
- >::callMethodWithReply(proxy_,
- DBusObjectManagerStub::getInterfaceName(),
- "GetManagedObjects",
- "",
- &defaultCallInfo,
- _status,
- dict);
-
- if (_status == CallStatus::SUCCESS) {
- translateCommonApiAddresses(dict, _availableInstances);
- }
+ std::vector<DBusAddress> itsAvailableServiceInstances;
+ instanceAvailabilityStatusEvent_.getAvailableServiceInstances(_status, itsAvailableServiceInstances);
+ translate(itsAvailableServiceInstances, _availableInstances);
}
std::future<CallStatus>
DBusProxyManager::getAvailableInstancesAsync(
GetAvailableInstancesCallback _callback) {
- return CommonAPI::DBus::DBusProxyHelper<
- CommonAPI::DBus::DBusSerializableArguments<>,
- CommonAPI::DBus::DBusSerializableArguments<
- DBusObjectManagerStub::DBusObjectPathAndInterfacesDict
- >
- >::callMethodAsync(
- proxy_,
- DBusObjectManagerStub::getInterfaceName(),
- "GetManagedObjects",
- "",
- &defaultCallInfo,
- std::move(
- std::bind(
- &DBusProxyManager::instancesAsyncCallback,
- this,
- std::placeholders::_1, std::placeholders::_2,
- _callback
- )
- ),
- std::tuple<DBusObjectManagerStub::DBusObjectPathAndInterfacesDict>());
+ return instanceAvailabilityStatusEvent_.getAvailableServiceInstancesAsync(std::bind(
+ &DBusProxyManager::instancesAsyncCallback,
+ this,
+ proxy_.shared_from_this(),
+ std::placeholders::_1,
+ std::placeholders::_2,
+ _callback));
}
void
DBusProxyManager::getInstanceAvailabilityStatus(
- const std::string &_address,
+ const std::string &_instance,
CallStatus &_callStatus,
AvailabilityStatus &_availabilityStatus) {
-
- CommonAPI::Address itsAddress("local", interfaceId_, _address);
- DBusAddress itsDBusAddress;
- DBusAddressTranslator::get()->translate(itsAddress, itsDBusAddress);
-
- _availabilityStatus = AvailabilityStatus::NOT_AVAILABLE;
- if (registry_->isServiceInstanceAlive(
- itsDBusAddress.getInterface(),
- itsDBusAddress.getService(),
- itsDBusAddress.getObjectPath())) {
- _availabilityStatus = AvailabilityStatus::AVAILABLE;
- }
- _callStatus = CallStatus::SUCCESS;
-}
-
-void
-DBusProxyManager::instanceAliveAsyncCallback(
- const AvailabilityStatus &_alive,
- GetInstanceAvailabilityStatusCallback &_call,
- std::shared_ptr<std::promise<CallStatus> > &_status) {
- _call(CallStatus::SUCCESS, _alive);
- _status->set_value(CallStatus::SUCCESS);
+ instanceAvailabilityStatusEvent_.getServiceInstanceAvailabilityStatus(_instance,
+ _callStatus,
+ _availabilityStatus);
}
std::future<CallStatus>
DBusProxyManager::getInstanceAvailabilityStatusAsync(
const std::string &_instance,
GetInstanceAvailabilityStatusCallback _callback) {
-
- CommonAPI::Address itsAddress("local", interfaceId_, _instance);
-
- std::shared_ptr<std::promise<CallStatus> > promise = std::make_shared<std::promise<CallStatus>>();
- registry_->subscribeAvailabilityListener(
- itsAddress.getAddress(),
- std::bind(&DBusProxyManager::instanceAliveAsyncCallback,
- this,
- std::placeholders::_1,
- _callback,
- promise)
- );
-
- return promise->get_future();
+ return instanceAvailabilityStatusEvent_.getServiceInstanceAvailabilityStatusAsync(_instance, _callback);
}
DBusProxyManager::InstanceAvailabilityStatusChangedEvent &
@@ -154,31 +97,12 @@ DBusProxyManager::getInstanceAvailabilityStatusChangedEvent() {
return instanceAvailabilityStatusEvent_;
}
-void
-DBusProxyManager::translateCommonApiAddresses(
- const DBusObjectManagerStub::DBusObjectPathAndInterfacesDict &_dict,
- std::vector<std::string> &_instances) {
-
- CommonAPI::Address itsAddress;
- DBusAddress itsDBusAddress;
-
- // get service information from proxy
- const std::string &_service = proxy_.getDBusAddress().getService();
- itsDBusAddress.setService(_service);
-
- for (const auto &objectPathIter : _dict) {
- itsDBusAddress.setObjectPath(objectPathIter.first);
-
- const auto &interfacesDict = objectPathIter.second;
- for (const auto &interfaceIter : interfacesDict) {
-
- // return only those addresses whose interface matches with ours
- if (interfaceIter.first == interfaceId_) {
- itsDBusAddress.setInterface(interfaceIter.first);
- DBusAddressTranslator::get()->translate(itsDBusAddress, itsAddress);
- _instances.push_back(itsAddress.getInstance());
- }
- }
+void DBusProxyManager::translate(const std::vector<DBusAddress> &_serviceInstances,
+ std::vector<std::string> &_instances) {
+ CommonAPI::Address itsCapiAddress;
+ for(auto itsDbusAddress : _serviceInstances) {
+ DBusAddressTranslator::get()->translate(itsDbusAddress, itsCapiAddress);
+ _instances.push_back(itsCapiAddress.getInstance());
}
}
diff --git a/src/CommonAPI/DBus/DBusServiceRegistry.cpp b/src/CommonAPI/DBus/DBusServiceRegistry.cpp
index 65290fc..d345a9d 100644
--- a/src/CommonAPI/DBus/DBusServiceRegistry.cpp
+++ b/src/CommonAPI/DBus/DBusServiceRegistry.cpp
@@ -46,8 +46,6 @@ DBusServiceRegistry::remove(std::shared_ptr<DBusProxyConnection> _connection) {
DBusServiceRegistry::DBusServiceRegistry(std::shared_ptr<DBusProxyConnection> dbusProxyConnection) :
dbusDaemonProxy_(std::make_shared<CommonAPI::DBus::DBusDaemonProxy>(dbusProxyConnection)),
initialized_(false),
- servicesToResolve(0),
- objectPathsToResolve(0),
notificationThread_() {
}
@@ -57,7 +55,6 @@ DBusServiceRegistry::~DBusServiceRegistry() {
}
dbusDaemonProxy_->getNameOwnerChangedEvent().unsubscribe(dbusDaemonProxyNameOwnerChangedEventSubscription_);
- dbusDaemonProxy_->getProxyStatusEvent().unsubscribe(dbusDaemonProxyStatusEventSubscription_);
// notify only listeners of resolved services (online > offline)
for (auto& dbusServiceListenersIterator : dbusServiceListenersMap) {
@@ -85,10 +82,6 @@ DBusServiceRegistry::~DBusServiceRegistry() {
void DBusServiceRegistry::init() {
translator_ = DBusAddressTranslator::get();
- dbusDaemonProxyStatusEventSubscription_ =
- dbusDaemonProxy_->getProxyStatusEvent().subscribe(
- std::bind(&DBusServiceRegistry::onDBusDaemonProxyStatusEvent, this, std::placeholders::_1));
-
dbusDaemonProxyNameOwnerChangedEventSubscription_ =
dbusDaemonProxy_->getNameOwnerChangedEvent().subscribe(
std::bind(&DBusServiceRegistry::onDBusDaemonProxyNameOwnerChangedEvent,
@@ -173,6 +166,8 @@ DBusServiceRegistry::subscribeAvailabilityListener(
}
dbusInterfaceNameListenersRecord.listenerList.push_front(std::move(serviceListener));
+ dbusInterfaceNameListenersRecord.listenersToRemove.remove(
+ dbusInterfaceNameListenersRecord.listenerList.begin());
dbusServicesMutex_.unlock();
@@ -217,15 +212,8 @@ DBusServiceRegistry::unsubscribeAvailabilityListener(
auto& dbusInterfaceNameListenersRecord = dbusInterfaceNameListenersIterator->second;
- dbusInterfaceNameListenersRecord.listenerList.erase(listenerSubscription);
-
- if (dbusInterfaceNameListenersRecord.listenerList.empty()) {
- dbusInterfaceNameListenersMap.erase(dbusInterfaceNameListenersIterator);
-
- if (dbusInterfaceNameListenersMap.empty()) {
- dbusServiceListenersRecord.dbusObjectPathListenersMap.erase(dbusObjectPathListenersIterator);
- }
- }
+ // mark listener to remove
+ dbusInterfaceNameListenersRecord.listenersToRemove.push_back(listenerSubscription);
dbusServicesMutex_.unlock();
}
@@ -234,6 +222,7 @@ DBusServiceRegistry::unsubscribeAvailabilityListener(
bool DBusServiceRegistry::isServiceInstanceAlive(const std::string& dbusInterfaceName,
const std::string& dbusServiceName,
const std::string& dbusObjectPath) {
+ bool result = false;
std::chrono::milliseconds timeout(1000);
bool uniqueNameFound = false;
@@ -267,9 +256,8 @@ bool DBusServiceRegistry::isServiceInstanceAlive(const std::string& dbusInterfac
std::shared_future<DBusRecordState> futureNameResolved = insertedDbusServiceListenerRecord.first->second.futureOnResolve;
futureNameResolved.wait_for(timeout);
- if(futureNameResolved.get() != DBusRecordState::RESOLVED) {
+ if(futureNameResolved.get() != DBusRecordState::RESOLVED)
return false;
- }
dbusServicesMutex_.lock();
auto dbusServiceListenersMapIterator = dbusServiceListenersMap.find(dbusServiceName);
@@ -296,68 +284,73 @@ bool DBusServiceRegistry::isServiceInstanceAlive(const std::string& dbusInterfac
dbusUniqueNameRecord = &dbusUniqueNameRecordIterator->second;
}
- dbusServicesMutex_.unlock();
-
if (NULL == dbusUniqueNameRecord) {
COMMONAPI_ERROR(std::string(__FUNCTION__), " no unique name record found for IF: ", dbusInterfaceName,
" service: ", dbusServiceName, "object path: ", dbusObjectPath);
}
- auto& dbusObjectPathsCache = dbusUniqueNameRecord->dbusObjectPathsCache;
- auto dbusObjectPathCacheIterator = dbusObjectPathsCache.find(dbusObjectPath);
+ if(dbusPredefinedServices_.find(dbusServiceName) == dbusPredefinedServices_.end()) {
- DBusObjectPathCache* dbusObjectPathCache = NULL;
+ auto& dbusObjectPathsCache = dbusUniqueNameRecord->dbusObjectPathsCache;
+ auto dbusObjectPathCacheIterator = dbusObjectPathsCache.find(dbusObjectPath);
- if(dbusObjectPathCacheIterator != dbusObjectPathsCache.end()) {
- dbusObjectPathCache = &(dbusObjectPathCacheIterator->second);
- if (dbusObjectPathCache->state != DBusRecordState::RESOLVED) {
- dbusObjectPathCache->state = DBusRecordState::RESOLVING;
- dbusServicesMutex_.lock();
+ DBusObjectPathCache* dbusObjectPathCache = NULL;
+ if(dbusObjectPathCacheIterator != dbusObjectPathsCache.end()) {
dbusObjectPathCache = &(dbusObjectPathCacheIterator->second);
+ if (dbusObjectPathCache->state != DBusRecordState::RESOLVED) {
+ dbusObjectPathCache->state = DBusRecordState::RESOLVING;
- std::future<DBusRecordState> futureObjectPathResolved = dbusObjectPathCache->promiseOnResolve.get_future();
- dbusServicesMutex_.unlock();
+ dbusObjectPathCache = &(dbusObjectPathCacheIterator->second);
- introspectDBusObjectPath(uniqueName, dbusObjectPath);
- futureObjectPathResolved.wait_for(timeout);
+ std::future<DBusRecordState> futureObjectPathResolved = dbusObjectPathCache->promiseOnResolve.get_future();
+ dbusServicesMutex_.unlock();
+
+ resolveObjectPathWithObjectManager(uniqueName, dbusObjectPath);
+ futureObjectPathResolved.wait_for(timeout);
+ } else {
+ dbusServicesMutex_.unlock();
+ }
}
- }
- else {
- // try to resolve object paths
- DBusObjectPathCache newDbusObjectPathCache;
- newDbusObjectPathCache.state = DBusRecordState::RESOLVING;
- newDbusObjectPathCache.serviceName = dbusServiceName;
+ else {
+ // try to resolve object paths
+ DBusObjectPathCache newDbusObjectPathCache;
+ newDbusObjectPathCache.state = DBusRecordState::RESOLVING;
+ newDbusObjectPathCache.serviceName = dbusServiceName;
- dbusServicesMutex_.lock();
+ dbusObjectPathsCache.insert(std::make_pair(dbusObjectPath, std::move(newDbusObjectPathCache)));
- dbusObjectPathsCache.insert(std::make_pair(dbusObjectPath, std::move(newDbusObjectPathCache)));
+ dbusObjectPathCacheIterator = dbusObjectPathsCache.find(dbusObjectPath);
- dbusObjectPathCacheIterator = dbusObjectPathsCache.find(dbusObjectPath);
+ dbusObjectPathCache = &(dbusObjectPathCacheIterator->second);
- dbusObjectPathCache = &(dbusObjectPathCacheIterator->second);
+ newDbusObjectPathCache.futureOnResolve = dbusObjectPathCache->promiseOnResolve.get_future();
+ dbusServicesMutex_.unlock();
- newDbusObjectPathCache.futureOnResolve = dbusObjectPathCache->promiseOnResolve.get_future();
- dbusServicesMutex_.unlock();
+ resolveObjectPathWithObjectManager(uniqueName, dbusObjectPath);
+ newDbusObjectPathCache.futureOnResolve.wait_for(timeout);
+ }
- introspectDBusObjectPath(uniqueName, dbusObjectPath);
- newDbusObjectPathCache.futureOnResolve.wait_for(timeout);
- }
+ if (NULL == dbusObjectPathCache) {
+ COMMONAPI_ERROR(std::string(__FUNCTION__), " no object path cache entry found for IF: ", dbusInterfaceName,
+ " service: ", dbusServiceName, "object path: ", dbusObjectPath);
+ }
- if (NULL == dbusObjectPathCache) {
- COMMONAPI_ERROR(std::string(__FUNCTION__), " no object path cache entry found for IF: ", dbusInterfaceName,
- " service: ", dbusServiceName, "object path: ", dbusObjectPath);
- }
+ dbusServicesMutex_.lock();
+ if(dbusObjectPathCache->state != DBusRecordState::RESOLVED) {
+ dbusServicesMutex_.unlock();
+ return false;
+ }
- dbusServicesMutex_.lock();
- if(dbusObjectPathCache->state != DBusRecordState::RESOLVED) {
+ auto dbusInterfaceNamesIterator = dbusObjectPathCache->dbusInterfaceNamesCache.find(dbusInterfaceName);
+ result = dbusInterfaceNamesIterator != dbusObjectPathCache->dbusInterfaceNamesCache.end();
dbusServicesMutex_.unlock();
- return false;
- }
- auto dbusInterfaceNamesIterator = dbusObjectPathCache->dbusInterfaceNamesCache.find(dbusInterfaceName);
- bool result = dbusInterfaceNamesIterator != dbusObjectPathCache->dbusInterfaceNamesCache.end();
- dbusServicesMutex_.unlock();
+ } else {
+ //service is predefined
+ result = true;
+ dbusServicesMutex_.unlock();
+ }
return(result);
}
@@ -372,6 +365,7 @@ void DBusServiceRegistry::fetchAllServiceNames() {
dbusDaemonProxy_->listNames(callStatus, availableServiceNames);
+ dbusServicesMutex_.lock();
if (callStatus == CallStatus::SUCCESS) {
for(std::string serviceName : availableServiceNames) {
if(isDBusServiceName(serviceName)) {
@@ -379,114 +373,27 @@ void DBusServiceRegistry::fetchAllServiceNames() {
}
}
}
+ dbusServicesMutex_.unlock();
}
-// d-feet mode
-std::vector<std::string> DBusServiceRegistry::getAvailableServiceInstances(const std::string& interfaceName,
- const std::string& domainName) {
- (void)domainName;
- std::vector<std::string> availableServiceInstances;
-
- // resolve all service names
- for (auto serviceNameIterator = dbusServiceNameMap_.begin();
- serviceNameIterator != dbusServiceNameMap_.end();
- serviceNameIterator++) {
-
- std::string serviceName = serviceNameIterator->first;
- DBusUniqueNameRecord* dbusUniqueNameRecord = serviceNameIterator->second;
-
- if(dbusUniqueNameRecord == NULL) {
- DBusServiceListenersRecord& serviceListenerRecord = dbusServiceListenersMap[serviceName];
- if(serviceListenerRecord.uniqueBusNameState != DBusRecordState::RESOLVING) {
- resolveDBusServiceName(serviceName, serviceListenerRecord);
- }
- }
- }
-
- std::mutex mutexResolveAllServices;
- std::unique_lock<std::mutex> lockResolveAllServices(mutexResolveAllServices);
- std::chrono::milliseconds timeout(5000);
-
- monitorResolveAllServices_.wait_for(lockResolveAllServices, timeout, [&] {
- mutexServiceResolveCount.lock();
- bool finished = servicesToResolve == 0;
- mutexServiceResolveCount.unlock();
-
- return finished;
- });
-
- for (auto serviceNameIterator = dbusServiceNameMap_.begin();
- serviceNameIterator != dbusServiceNameMap_.end();
- serviceNameIterator++) {
-
- std::string serviceName = serviceNameIterator->first;
- DBusUniqueNameRecord* dbusUniqueNameRecord = serviceNameIterator->second;
-
- if(dbusUniqueNameRecord != NULL) {
- if(dbusUniqueNameRecord->objectPathsState == DBusRecordState::UNKNOWN) {
- DBusObjectPathCache& rootObjectPathCache = dbusUniqueNameRecord->dbusObjectPathsCache["/"];
- if(rootObjectPathCache.state == DBusRecordState::UNKNOWN) {
- rootObjectPathCache.state = DBusRecordState::RESOLVING;
- introspectDBusObjectPath(dbusUniqueNameRecord->uniqueName, "/");
- }
- }
- }
- }
-
- std::mutex mutexResolveAllObjectPaths;
- std::unique_lock<std::mutex> lockResolveAllObjectPaths(mutexResolveAllObjectPaths);
-
- // TODO: Check if should use the remaining timeout not "used" during wait before
- monitorResolveAllObjectPaths_.wait_for(lockResolveAllObjectPaths, timeout, [&] {
- mutexServiceResolveCount.lock();
- bool finished = objectPathsToResolve == 0;
- mutexServiceResolveCount.unlock();
-
- return finished;
- });
-
- for (auto serviceNameIterator = dbusServiceNameMap_.begin();
- serviceNameIterator != dbusServiceNameMap_.end();
- serviceNameIterator++) {
-
- std::string serviceName = serviceNameIterator->first;
- DBusUniqueNameRecord* dbusUniqueNameRecord = serviceNameIterator->second;
-
- if(dbusUniqueNameRecord != NULL) {
- if(dbusUniqueNameRecord->objectPathsState == DBusRecordState::RESOLVED) {
- for (auto dbusObjectPathCacheIterator = dbusUniqueNameRecord->dbusObjectPathsCache.begin();
- dbusObjectPathCacheIterator != dbusUniqueNameRecord->dbusObjectPathsCache.end();
- dbusObjectPathCacheIterator++) {
- if (dbusObjectPathCacheIterator->second.state == DBusRecordState::RESOLVED) {
- if (dbusObjectPathCacheIterator->second.dbusInterfaceNamesCache.find(interfaceName)
- != dbusObjectPathCacheIterator->second.dbusInterfaceNamesCache.end()) {
- std::string commonApiAddress;
- translator_->translate(
- dbusObjectPathCacheIterator->first + "/" + interfaceName + "/" + serviceName, commonApiAddress);
- availableServiceInstances.push_back(commonApiAddress);
- }
- }
- }
- }
- }
- }
-
- // maybe partial list but it contains everything we know for now
- return availableServiceInstances;
+void DBusServiceRegistry::getAvailableServiceInstances(const std::string& dbusServiceName,
+ const std::string& dbusObjectPath,
+ DBusObjectManagerStub::DBusObjectPathAndInterfacesDict& availableServiceInstances) {
+ getManagedObjects(dbusServiceName, dbusObjectPath, availableServiceInstances);
}
-void DBusServiceRegistry::getAvailableServiceInstancesAsync(CommonAPI::Factory::AvailableInstancesCbk_t _cbk,
- const std::string &_interface,
- const std::string &_domain) {
- //Necessary as service discovery might need some time, but the async version of "getAvailableServiceInstances"
- //shall return without delay.
- std::thread(
- [this, _cbk, _interface, _domain](std::shared_ptr<DBusServiceRegistry> selfRef) {
- (void)selfRef;
- auto instances = getAvailableServiceInstances(_interface, _domain);
- _cbk(instances);
- }, this->shared_from_this()
- ).detach();
+void DBusServiceRegistry::getAvailableServiceInstancesAsync(GetAvailableServiceInstancesCallback callback,
+ const std::string& dbusServiceName,
+ const std::string& dbusObjectPath) {
+ getManagedObjectsAsync(dbusServiceName, dbusObjectPath, [callback](const CallStatus& callStatus,
+ const DBusObjectManagerStub::DBusObjectPathAndInterfacesDict availableServiceInstances,
+ const std::string& dbusServiceName,
+ const std::string& dbusObjectPath) {
+ (void)callStatus;
+ (void)dbusServiceName;
+ (void)dbusObjectPath;
+ callback(availableServiceInstances);
+ });
}
void DBusServiceRegistry::onSignalDBusMessage(const DBusMessage &_dbusMessage) {
@@ -498,7 +405,8 @@ void DBusServiceRegistry::onSignalDBusMessage(const DBusMessage &_dbusMessage) {
if (!_dbusMessage.hasInterfaceName("org.freedesktop.DBus.ObjectManager")) {
COMMONAPI_ERROR(std::string(__FUNCTION__), " unexpected interface ", _dbusMessage.getInterface());
}
- if (!_dbusMessage.hasMemberName("InterfacesAdded") && !_dbusMessage.hasMemberName("InterfacesAdded")) {
+ if (!_dbusMessage.hasMemberName("InterfacesAdded") && !_dbusMessage.hasMemberName("InterfacesAdded") &&
+ !_dbusMessage.hasMemberName("InterfacesRemoved") && !_dbusMessage.hasMemberName("InterfacesRemoved") ) {
COMMONAPI_ERROR(std::string(__FUNCTION__), " unexpected member ", _dbusMessage.getMember());
}
@@ -598,10 +506,6 @@ void DBusServiceRegistry::resolveDBusServiceName(const std::string& dbusServiceN
COMMONAPI_ERROR(std::string(__FUNCTION__), " unique name not empty ", dbusServiceListenersRecord.uniqueBusName);
}
- mutexServiceResolveCount.lock();
- servicesToResolve++;
- mutexServiceResolveCount.unlock();
-
if (dbusDaemonProxy_->isAvailable()) {
auto func = std::bind(
@@ -648,11 +552,6 @@ void DBusServiceRegistry::onGetNameOwnerCallback(const CallStatus& status,
onDBusServiceNotAvailable(dbusServiceListenersRecord, dbusServiceName);
}
- mutexServiceResolveCount.lock();
- servicesToResolve--;
- mutexServiceResolveCount.unlock();
- monitorResolveAllServices_.notify_all();
-
dbusServicesMutex_.unlock();
}
@@ -788,28 +687,51 @@ bool DBusServiceRegistry::resolveObjectPathWithObjectManager(const std::string&
std::placeholders::_2,
dbusServiceUniqueName,
dbusObjectPath);
- return getManagedObjects(dbusServiceUniqueName, "/", getManagedObjectsCallback);
+ return getManagedObjectsAsync(dbusServiceUniqueName, "/", getManagedObjectsCallback);
}
-bool DBusServiceRegistry::getManagedObjects(const std::string& dbusServiceUniqueName,
- const std::string& dbusObjectPath,
- GetManagedObjectsCallback callback) {
- bool isSendingInProgress = false;
+bool DBusServiceRegistry::getManagedObjects(const std::string& dbusServiceName,
+ const std::string& dbusObjectPath,
+ DBusObjectManagerStub::DBusObjectPathAndInterfacesDict& availableServiceInstances) {
auto dbusConnection = dbusDaemonProxy_->getDBusConnection();
- if (dbusServiceUniqueName.empty()) {
- COMMONAPI_ERROR(std::string(__FUNCTION__), " dbusServiceUniqueName empty");
+ if (dbusServiceName.empty()) {
+ COMMONAPI_ERROR(std::string(__FUNCTION__), " dbusServiceName empty");
}
if(dbusConnection->isConnected()) {
- if(dbusObjectPath != "/") {
- mutexObjectPathsResolveCount.lock();
- objectPathsToResolve++;
- mutexObjectPathsResolveCount.unlock();
- }
+ DBusAddress dbusAddress(dbusServiceName, dbusObjectPath, "org.freedesktop.DBus.ObjectManager");
+ DBusMessage dbusMessageCall = CommonAPI::DBus::DBusMessage::createMethodCall(
+ dbusAddress,
+ "GetManagedObjects");
+
+ DBusError error;
+ CallInfo* defaultCallInfo = new CallInfo();
+ DBusMessage reply = dbusConnection->sendDBusMessageWithReplyAndBlock(dbusMessageCall, error, defaultCallInfo);
+ delete defaultCallInfo;
+
+ DBusInputStream input(reply);
+ if (!DBusSerializableArguments<DBusObjectManagerStub::DBusObjectPathAndInterfacesDict>::deserialize(
+ input, availableServiceInstances) || error)
+ return false;
+ }
+ return true;
+}
+
+bool DBusServiceRegistry::getManagedObjectsAsync(const std::string& dbusServiceName,
+ const std::string& dbusObjectPath,
+ GetManagedObjectsCallback callback) {
+ bool isSendingInProgress = false;
+ auto dbusConnection = dbusDaemonProxy_->getDBusConnection();
+
+ if (dbusServiceName.empty()) {
+ COMMONAPI_ERROR(std::string(__FUNCTION__), " dbusServiceName empty");
+ }
+
+ if(dbusConnection->isConnected()) {
- DBusAddress dbusAddress(dbusServiceUniqueName, dbusObjectPath, "org.freedesktop.DBus.ObjectManager");
+ DBusAddress dbusAddress(dbusServiceName, dbusObjectPath, "org.freedesktop.DBus.ObjectManager");
DBusMessage dbusMessageCall = CommonAPI::DBus::DBusMessage::createMethodCall(
dbusAddress,
"GetManagedObjects");
@@ -818,7 +740,7 @@ bool DBusServiceRegistry::getManagedObjects(const std::string& dbusServiceUnique
callback,
std::placeholders::_1,
std::placeholders::_2,
- dbusServiceUniqueName,
+ dbusServiceName,
dbusObjectPath);
DBusProxyAsyncCallbackHandler<
@@ -840,14 +762,14 @@ bool DBusServiceRegistry::getManagedObjects(const std::string& dbusServiceUnique
}
void DBusServiceRegistry::onGetManagedObjectsCallbackResolve(const CallStatus& callStatus,
- const DBusObjectManagerStub::DBusObjectPathAndInterfacesDict dbusObjectPathAndInterfacesDict,
+ const DBusObjectManagerStub::DBusObjectPathAndInterfacesDict availableServiceInstances,
const std::string& dbusServiceUniqueName,
const std::string& dbusObjectPath) {
if(callStatus == CallStatus::SUCCESS) {
//has object manager
bool objectPathFound = false;
- for(auto objectPathDict : dbusObjectPathAndInterfacesDict)
+ for(auto objectPathDict : availableServiceInstances)
{
std::string objectPath = objectPathDict.first;
if(objectPath != dbusObjectPath)
@@ -863,16 +785,6 @@ void DBusServiceRegistry::onGetManagedObjectsCallbackResolve(const CallStatus& c
processManagedObject(dbusObjectPath, dbusServiceUniqueName, interfaceName);
dbusServicesMutex_.unlock();
}
-
- // resolve further managed objects
- auto callback = std::bind(
- &DBusServiceRegistry::onGetManagedObjectsCallbackResolveFurther,
- this->shared_from_this(),
- std::placeholders::_1,
- std::placeholders::_2,
- std::placeholders::_3,
- std::placeholders::_4);
- getManagedObjects(dbusServiceUniqueName, dbusObjectPath, callback);
}
if(!objectPathFound) {
@@ -885,193 +797,41 @@ void DBusServiceRegistry::onGetManagedObjectsCallbackResolve(const CallStatus& c
dbusServiceUniqueName,
dbusObjectPath);
std::string objectPathManager = dbusObjectPath.substr(0, dbusObjectPath.find_last_of("\\/"));
- getManagedObjects(dbusServiceUniqueName, objectPathManager, getManagedObjectsCallback);
+ getManagedObjectsAsync(dbusServiceUniqueName, objectPathManager, getManagedObjectsCallback);
}
} else {
COMMONAPI_ERROR("There is no Object Manager that manages " + dbusObjectPath + ". Resolving failed!");
}
}
-void DBusServiceRegistry::onGetManagedObjectsCallbackResolveFurther(const CallStatus& callStatus,
- const DBusObjectManagerStub::DBusObjectPathAndInterfacesDict dbusObjectPathAndInterfacesDict,
- const std::string& dbusServiceUniqueName,
- const std::string& dbusObjectPath) {
-
- if(callStatus == CallStatus::SUCCESS) {
- for(auto objectPathDict : dbusObjectPathAndInterfacesDict)
- {
- //resolve
- std::string objectPath = objectPathDict.first;
- CommonAPI::DBus::DBusObjectManagerStub::DBusInterfacesAndPropertiesDict interfacesAndPropertiesDict = objectPathDict.second;
- for(auto interfaceDict : interfacesAndPropertiesDict)
- {
- std::string interfaceName = interfaceDict.first;
- dbusServicesMutex_.lock();
- processManagedObject(objectPath, dbusServiceUniqueName, interfaceName);
- dbusServicesMutex_.unlock();
- }
-
- // resolve further managed objects
- auto callback = std::bind(
- &DBusServiceRegistry::onGetManagedObjectsCallbackResolveFurther,
- this->shared_from_this(),
- std::placeholders::_1,
- std::placeholders::_2,
- std::placeholders::_3,
- std::placeholders::_4);
- getManagedObjects(dbusServiceUniqueName, objectPath, callback);
- }
- } else {
- // No further managed objects
- }
-
- dbusServicesMutex_.lock();
+void DBusServiceRegistry::processManagedObject(const std::string& dbusObjectPath,
+ const std::string& dbusServiceUniqueName,
+ const std::string& interfaceName) {
auto dbusServiceUniqueNameIterator = dbusUniqueNamesMap_.find(dbusServiceUniqueName);
const bool isDBusServiceUniqueNameFound = (dbusServiceUniqueNameIterator != dbusUniqueNamesMap_.end());
- if (!isDBusServiceUniqueNameFound) {
- dbusServicesMutex_.unlock();
+ if (!isDBusServiceUniqueNameFound)
return;
- }
DBusUniqueNameRecord& dbusUniqueNameRecord = dbusServiceUniqueNameIterator->second;
auto dbusObjectPathIterator = dbusUniqueNameRecord.dbusObjectPathsCache.find(dbusObjectPath);
const bool isDBusObjectPathFound = (dbusObjectPathIterator != dbusUniqueNameRecord.dbusObjectPathsCache.end());
- if (!isDBusObjectPathFound) {
- dbusServicesMutex_.unlock();
+ if (!isDBusObjectPathFound)
return;
- }
DBusObjectPathCache& dbusObjectPathRecord = dbusObjectPathIterator->second;
- dbusObjectPathRecord.state = DBusRecordState::RESOLVED;
- if(dbusObjectPathRecord.futureOnResolve.valid()) {
- dbusObjectPathRecord.promiseOnResolve.set_value(dbusObjectPathRecord.state);
- }
- mutexObjectPathsResolveCount.lock();
- objectPathsToResolve--;
- mutexObjectPathsResolveCount.unlock();
- monitorResolveAllObjectPaths_.notify_all();
-
- dbusUniqueNameRecord.objectPathsState = DBusRecordState::RESOLVED;
-
- notifyDBusServiceListeners(
- dbusUniqueNameRecord,
- dbusObjectPath,
- dbusObjectPathRecord.dbusInterfaceNamesCache,
- DBusRecordState::RESOLVED);
-
- dbusServicesMutex_.unlock();
-}
-
-void DBusServiceRegistry::processManagedObject(const std::string& dbusObjectPath,
- const std::string& dbusServiceUniqueName,
- const std::string& interfaceName) {
- DBusUniqueNameRecord& dbusUniqueNameRecord = dbusUniqueNamesMap_[dbusServiceUniqueName];
- DBusObjectPathCache& dbusObjectPathCache = dbusUniqueNameRecord.dbusObjectPathsCache[dbusObjectPath];
-
if(!isOrgFreedesktopDBusInterface(interfaceName)) {
- dbusObjectPathCache.dbusInterfaceNamesCache.insert(interfaceName);
+ dbusObjectPathRecord.dbusInterfaceNamesCache.insert(interfaceName);
} else if (translator_->isOrgFreedesktopDBusPeerMapped() && (interfaceName == "org.freedesktop.DBus.Peer")) {
- dbusObjectPathCache.dbusInterfaceNamesCache.insert(interfaceName);
- }
-}
-
-bool DBusServiceRegistry::introspectDBusObjectPath(const std::string& dbusServiceUniqueName,
- const std::string& dbusObjectPath) {
- bool isResolvingInProgress = false;
- auto dbusConnection = dbusDaemonProxy_->getDBusConnection();
-
- if (dbusServiceUniqueName.empty()) {
- COMMONAPI_ERROR(std::string(__FUNCTION__), " dbusServiceUniqueName empty");
+ dbusObjectPathRecord.dbusInterfaceNamesCache.insert(interfaceName);
}
- if (dbusConnection->isConnected()) {
- mutexObjectPathsResolveCount.lock();
- objectPathsToResolve++;
- mutexObjectPathsResolveCount.unlock();
-
- DBusAddress dbusAddress(dbusServiceUniqueName, dbusObjectPath, "org.freedesktop.DBus.Introspectable");
- DBusMessage dbusMessageCall = DBusMessage::createMethodCall(
- dbusAddress,
- "Introspect");
- auto instrospectAsyncCallback = std::bind(
- &DBusServiceRegistry::onIntrospectCallback,
- this->shared_from_this(),
- std::placeholders::_1,
- std::placeholders::_2,
- dbusServiceUniqueName,
- dbusObjectPath);
-
- DBusProxyAsyncCallbackHandler<
- DBusServiceRegistry,
- std::string
- >::Delegate delegate(shared_from_this(), instrospectAsyncCallback);
-
- dbusConnection->sendDBusMessageWithReplyAsync(
- dbusMessageCall,
- DBusProxyAsyncCallbackHandler<
- DBusServiceRegistry,
- std::string
- >::create(delegate, std::tuple<std::string>()),
- &serviceRegistryInfo);
-
- isResolvingInProgress = true;
- }
- return isResolvingInProgress;
-}
-
-/**
- * Callback for org.freedesktop.DBus.Introspectable.Introspect
- *
- * This is the other end of checking if a dbus object path is available.
- * On success it'll extract all interface names from the xml data response.
- * Special interfaces that start with "org.freedesktop.DBus." will be ignored.
- *
- * @param status
- * @param xmlData
- * @param dbusServiceUniqueName
- * @param dbusObjectPath
- */
-void DBusServiceRegistry::onIntrospectCallback(const CallStatus& callStatus,
- std::string xmlData,
- const std::string& dbusServiceUniqueName,
- const std::string& dbusObjectPath) {
- if (callStatus == CallStatus::SUCCESS) {
- parseIntrospectionData(xmlData, dbusObjectPath, dbusServiceUniqueName);
- }
-
- dbusServicesMutex_.lock();
-
- // Error CallStatus will result in empty parsedDBusInterfaceNameSet (and not available notification)
-
- auto dbusServiceUniqueNameIterator = dbusUniqueNamesMap_.find(dbusServiceUniqueName);
- const bool isDBusServiceUniqueNameFound = (dbusServiceUniqueNameIterator != dbusUniqueNamesMap_.end());
-
- if (!isDBusServiceUniqueNameFound) {
- dbusServicesMutex_.unlock();
- return;
- }
-
- DBusUniqueNameRecord& dbusUniqueNameRecord = dbusServiceUniqueNameIterator->second;
- auto dbusObjectPathIterator = dbusUniqueNameRecord.dbusObjectPathsCache.find(dbusObjectPath);
- const bool isDBusObjectPathFound = (dbusObjectPathIterator != dbusUniqueNameRecord.dbusObjectPathsCache.end());
-
- if (!isDBusObjectPathFound) {
- dbusServicesMutex_.unlock();
- return;
- }
-
- DBusObjectPathCache& dbusObjectPathRecord = dbusObjectPathIterator->second;
-
dbusObjectPathRecord.state = DBusRecordState::RESOLVED;
- dbusObjectPathRecord.promiseOnResolve.set_value(dbusObjectPathRecord.state);
- mutexObjectPathsResolveCount.lock();
- objectPathsToResolve--;
- mutexObjectPathsResolveCount.unlock();
- monitorResolveAllObjectPaths_.notify_all();
+ if(dbusObjectPathRecord.futureOnResolve.valid())
+ dbusObjectPathRecord.promiseOnResolve.set_value(dbusObjectPathRecord.state);
dbusUniqueNameRecord.objectPathsState = DBusRecordState::RESOLVED;
@@ -1080,104 +840,6 @@ void DBusServiceRegistry::onIntrospectCallback(const CallStatus& callStatus,
dbusObjectPath,
dbusObjectPathRecord.dbusInterfaceNamesCache,
DBusRecordState::RESOLVED);
-
- dbusServicesMutex_.unlock();
-}
-
-void DBusServiceRegistry::parseIntrospectionNode(const pugi::xml_node& node, const std::string& rootObjectPath, const std::string& fullObjectPath, const std::string& dbusServiceUniqueName) {
- std::string nodeName;
-
- for(pugi::xml_node& subNode : node.children()) {
- nodeName = std::string(subNode.name());
-
- if(nodeName == "node") {
- processIntrospectionObjectPath(subNode, rootObjectPath, dbusServiceUniqueName);
- }
-
- if(nodeName == "interface") {
- processIntrospectionInterface(subNode, rootObjectPath, fullObjectPath, dbusServiceUniqueName);
- }
- }
-}
-
-void DBusServiceRegistry::processIntrospectionObjectPath(const pugi::xml_node& node, const std::string& rootObjectPath, const std::string& dbusServiceUniqueName) {
- std::string fullObjectPath = rootObjectPath;
-
- if(fullObjectPath.at(fullObjectPath.length()-1) != '/') {
- fullObjectPath += "/";
- }
-
- fullObjectPath += std::string(node.attribute("name").as_string());
-
- DBusUniqueNameRecord& dbusUniqueNameRecord = dbusUniqueNamesMap_[dbusServiceUniqueName];
- DBusObjectPathCache& dbusObjectPathCache = dbusUniqueNameRecord.dbusObjectPathsCache[fullObjectPath];
-
- if(dbusObjectPathCache.state == DBusRecordState::UNKNOWN) {
- dbusObjectPathCache.state = DBusRecordState::RESOLVING;
- introspectDBusObjectPath(dbusServiceUniqueName, fullObjectPath);
- }
-
- for(pugi::xml_node subNode : node.children()) {
- parseIntrospectionNode(subNode, fullObjectPath, fullObjectPath, dbusServiceUniqueName);
- }
-}
-
-void DBusServiceRegistry::processIntrospectionInterface(const pugi::xml_node& node, const std::string& rootObjectPath, const std::string& fullObjectPath, const std::string& dbusServiceUniqueName) {
- std::string interfaceName = node.attribute("name").as_string();
- DBusUniqueNameRecord& dbusUniqueNameRecord = dbusUniqueNamesMap_[dbusServiceUniqueName];
- DBusObjectPathCache& dbusObjectPathCache = dbusUniqueNameRecord.dbusObjectPathsCache[fullObjectPath];
-
- if(!isOrgFreedesktopDBusInterface(interfaceName)) {
- dbusObjectPathCache.dbusInterfaceNamesCache.insert(interfaceName);
- } else if (translator_->isOrgFreedesktopDBusPeerMapped() && (interfaceName == "org.freedesktop.DBus.Peer")) {
- dbusObjectPathCache.dbusInterfaceNamesCache.insert(interfaceName);
- }
-
- for(pugi::xml_node subNode : node.children()) {
- parseIntrospectionNode(subNode, rootObjectPath, fullObjectPath, dbusServiceUniqueName);
- }
-}
-
-void DBusServiceRegistry::parseIntrospectionData(const std::string& xmlData,
- const std::string& rootObjectPath,
- const std::string& dbusServiceUniqueName) {
- pugi::xml_document xmlDocument;
- pugi::xml_parse_result parsedResult = xmlDocument.load_buffer(xmlData.c_str(), xmlData.length(), pugi::parse_minimal, pugi::encoding_utf8);
-
- if(parsedResult.status != pugi::xml_parse_status::status_ok) {
- return;
- }
-
- const pugi::xml_node rootNode = xmlDocument.child("node");
-
- dbusServicesMutex_.lock();
-
- parseIntrospectionNode(rootNode, rootObjectPath, rootObjectPath, dbusServiceUniqueName);
-
- dbusUniqueNamesMap_[dbusServiceUniqueName];
- dbusServicesMutex_.unlock();
-}
-
-
-void DBusServiceRegistry::onDBusDaemonProxyStatusEvent(const AvailabilityStatus& availabilityStatus) {
- if (availabilityStatus == AvailabilityStatus::UNKNOWN) {
- COMMONAPI_ERROR(std::string(__FUNCTION__), " unexpected availability status ", int(availabilityStatus));
- }
-
- dbusServicesMutex_.lock();
-
- for (auto& dbusServiceListenersIterator : dbusServiceListenersMap) {
- const auto& dbusServiceName = dbusServiceListenersIterator.first;
- auto& dbusServiceListenersRecord = dbusServiceListenersIterator.second;
-
- if (availabilityStatus == AvailabilityStatus::AVAILABLE) {
- resolveDBusServiceName(dbusServiceName, dbusServiceListenersRecord);
- } else {
- onDBusServiceNotAvailable(dbusServiceListenersRecord, dbusServiceName);
- }
- }
-
- dbusServicesMutex_.unlock();
}
void DBusServiceRegistry::checkDBusServiceWasAvailable(const std::string& dbusServiceName,
@@ -1378,6 +1040,9 @@ void DBusServiceRegistry::notifyDBusObjectPathChanged(DBusInterfaceNameListeners
auto& dbusInterfaceNameListenersRecord = dbusInterfaceNameListenersIterator->second;
notifyDBusInterfaceNameListeners(dbusInterfaceNameListenersRecord, isDBusInterfaceNameAvailable);
+
+ if (dbusInterfaceNameListenersRecord.listenerList.empty())
+ dbusInterfaceNameListenersMap.erase(dbusInterfaceNameListenersIterator);
}
}
}
@@ -1394,10 +1059,20 @@ void DBusServiceRegistry::notifyDBusInterfaceNameListeners(DBusInterfaceNameList
}
dbusInterfaceNameListenersRecord.state = notifyState;
- for (auto dbusServiceListenerIterator = dbusInterfaceNameListenersRecord.listenerList.begin();
- dbusServiceListenerIterator != dbusInterfaceNameListenersRecord.listenerList.end();
- dbusServiceListenerIterator++) {
- (*dbusServiceListenerIterator)(availabilityStatus);
+ auto dbusServiceListenerIterator = dbusInterfaceNameListenersRecord.listenerList.begin();
+ while (dbusServiceListenerIterator != dbusInterfaceNameListenersRecord.listenerList.end()) {
+
+ auto itsRemoveListenerIt = std::find(dbusInterfaceNameListenersRecord.listenersToRemove.begin(),
+ dbusInterfaceNameListenersRecord.listenersToRemove.end(),
+ dbusServiceListenerIterator);
+
+ if(itsRemoveListenerIt != dbusInterfaceNameListenersRecord.listenersToRemove.end()) {
+ dbusInterfaceNameListenersRecord.listenersToRemove.remove(dbusServiceListenerIterator);
+ dbusServiceListenerIterator = dbusInterfaceNameListenersRecord.listenerList.erase(dbusServiceListenerIterator);
+ } else {
+ (*dbusServiceListenerIterator)(availabilityStatus);
+ ++dbusServiceListenerIterator;
+ }
}
}
diff --git a/src/CommonAPI/DBus/DBusStubAdapter.cpp b/src/CommonAPI/DBus/DBusStubAdapter.cpp
index 422a419..0f86cc1 100644
--- a/src/CommonAPI/DBus/DBusStubAdapter.cpp
+++ b/src/CommonAPI/DBus/DBusStubAdapter.cpp
@@ -16,12 +16,11 @@ DBusStubAdapter::DBusStubAdapter(const DBusAddress &_dbusAddress,
: dbusAddress_(_dbusAddress),
connection_(_connection),
isManaging_(_isManaging) {
- Factory::get()->incrementConnection(connection_);
}
DBusStubAdapter::~DBusStubAdapter() {
deinit();
- Factory::get()->decrementConnection(connection_);
+ Factory::get()->unregisterStub(address_.getDomain(), address_.getInterface(), address_.getInstance());
}
void DBusStubAdapter::init(std::shared_ptr<DBusStubAdapter> _instance) {