summaryrefslogtreecommitdiff
path: root/src/CommonAPI/DBus/DBusConnection.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/CommonAPI/DBus/DBusConnection.cpp')
-rw-r--r--src/CommonAPI/DBus/DBusConnection.cpp301
1 files changed, 219 insertions, 82 deletions
diff --git a/src/CommonAPI/DBus/DBusConnection.cpp b/src/CommonAPI/DBus/DBusConnection.cpp
index 0ce70bc..003aea9 100644
--- a/src/CommonAPI/DBus/DBusConnection.cpp
+++ b/src/CommonAPI/DBus/DBusConnection.cpp
@@ -16,7 +16,6 @@
#include <CommonAPI/DBus/DBusProxy.hpp>
#include <CommonAPI/DBus/DBusAddressTranslator.hpp>
-
namespace CommonAPI {
namespace DBus {
@@ -46,6 +45,93 @@ void DBusConnectionStatusEvent::onListenerAdded(const Listener &_listener, const
_listener(AvailabilityStatus::AVAILABLE);
}
+// Helper class to ensure that CommonAPI::Runtime (static instance) is not destroyed unless last
+// connection is closed. Also tries to safely join dispatch threads (~CompletionHelper will be
+// executed in main thread's context on program exit), in case DBusConnection::disconnect() has
+// been invoked from dispatch thread itself, thus "self-joining" would have raised an exception.
+class CompletionHelper {
+public:
+
+ static std::unique_ptr<CompletionHelper> & get() {
+ static std::unique_ptr<CompletionHelper> theCompleter = std::unique_ptr<CompletionHelper>(new CompletionHelper());
+ return theCompleter;
+ }
+
+ ~CompletionHelper() {
+ destructing_ = true;
+
+ std::set<std::uintptr_t>::size_type activeConnections(connections_.max_size());
+ {
+ std::lock_guard<std::mutex> lock(mutex_);
+ activeConnections = connections_.size();
+ }
+
+ bool forceDetach(false);
+ if (0u != activeConnections) {
+ std::future<bool> ready = readyToCleanup_.get_future();
+ if (ready.valid()) {
+ const std::future_status status = ready.wait_for(std::chrono::seconds(1));
+ forceDetach = (std::future_status::ready != status);
+ }
+ }
+
+ {
+ std::lock_guard<std::mutex> lock(mutex_);
+ for (std::thread * p : threads_) {
+ if (nullptr != p) {
+ if (!forceDetach && p->joinable()) {
+ p->join();
+ } else {
+ p->detach();
+ }
+ delete p;
+ }
+ }
+ }
+ };
+
+ void registerConnection(std::uintptr_t conn) {
+ std::lock_guard<std::mutex> lock(mutex_);
+ connections_.insert(conn);
+ }
+
+ void unregisterConnection(std::uintptr_t conn) {
+ std::lock_guard<std::mutex> lock(mutex_);
+ if ((1u == connections_.erase(conn)) && (0u == connections_.size()) && destructing_) {
+ readyToCleanup_.set_value(true);
+ }
+ }
+
+ void joinOnExit(std::uintptr_t conn, std::thread & t) {
+ std::lock_guard<std::mutex> lock(mutex_);
+ if (connections_.end() != connections_.find(conn)) {
+ bool found(false);
+ std::vector<std::thread*>::const_iterator it = threads_.begin();
+ while (!found && it != threads_.end()) {
+ found = (&t == *it++);
+ }
+ if (!found) {
+ threads_.push_back(&t);
+ }
+ }
+ }
+
+private:
+
+ CompletionHelper()
+ : destructing_(false) {
+ }
+
+ CompletionHelper(CompletionHelper const &) = delete;
+ void operator= (CompletionHelper const &) = delete;
+
+ std::mutex mutex_;
+ std::vector<std::thread *> threads_;
+ std::set<std::uintptr_t> connections_;
+
+ std::promise<bool> readyToCleanup_;
+ std::atomic<bool> destructing_;
+};
const DBusObjectPathVTable* DBusConnection::getDBusObjectPathVTable() {
static const DBusObjectPathVTable libdbusObjectPathVTable = {
@@ -130,57 +216,6 @@ DBusConnection::~DBusConnection() {
delete watchContext_;
delete timeoutContext_;
}
-
- //Assert that the enforcerThread_ is in a position to finish itself correctly even after destruction
- //of the DBusConnection. Also assert all resources are cleaned up.
- auto it = timeoutMap_.begin();
- while (it != timeoutMap_.end()) {
- DBusMessageReplyAsyncHandler* asyncHandler = std::get<1>(it->second);
- DBusPendingCall* libdbusPendingCall = it->first;
-
- asyncHandler->lock();
- bool executionStarted = asyncHandler->getExecutionStarted();
- bool executionFinished = asyncHandler->getExecutionFinished();
- if (executionStarted && !executionFinished) {
- asyncHandler->setHasToBeDeleted();
- it = timeoutMap_.erase(it);
- asyncHandler->unlock();
- continue;
- }
- if (!executionStarted && !executionFinished && !dbus_pending_call_get_completed(libdbusPendingCall)) {
- dbus_pending_call_cancel(libdbusPendingCall);
- }
- asyncHandler->unlock();
-
- if (!executionStarted && !executionFinished) {
- DBusMessage& dbusMessageCall = std::get<2>(it->second);
-
- asyncHandler->onDBusMessageReply(CallStatus::REMOTE_ERROR, dbusMessageCall.createMethodError(DBUS_ERROR_TIMEOUT));
-
- dbus_pending_call_unref(libdbusPendingCall);
- }
- it = timeoutMap_.erase(it);
- delete asyncHandler;
- }
-
- auto itTimeoutInf = timeoutInfiniteAsyncHandlers_.begin();
- while (itTimeoutInf != timeoutInfiniteAsyncHandlers_.end()) {
- DBusMessageReplyAsyncHandler* asyncHandler = (*itTimeoutInf);
-
- asyncHandler->lock();
- bool executionStarted = asyncHandler->getExecutionStarted();
- bool executionFinished = asyncHandler->getExecutionFinished();
- if (executionStarted && !executionFinished) {
- asyncHandler->setHasToBeDeleted();
- itTimeoutInf = timeoutInfiniteAsyncHandlers_.erase(itTimeoutInf);
- asyncHandler->unlock();
- continue;
- }
- asyncHandler->unlock();
-
- itTimeoutInf = timeoutInfiniteAsyncHandlers_.erase(itTimeoutInf);
- delete asyncHandler;
- }
}
bool DBusConnection::attachMainLoopContext(std::weak_ptr<MainLoopContext> mainLoopContext) {
@@ -402,8 +437,7 @@ bool DBusConnection::connect(DBusError &dbusError, bool startDispatchThread) {
initLibdbusSignalFilterAfterConnect();
- enforcerThread_ = std::make_shared<std::thread>(
- std::bind(&DBusConnection::enforceAsynchronousTimeouts, shared_from_this()));
+ enforcerThread_ = new std::thread(std::bind(&DBusConnection::enforceAsynchronousTimeouts, shared_from_this()));
dbusConnectionStatusEvent_.notifyListeners(AvailabilityStatus::AVAILABLE);
@@ -415,6 +449,11 @@ bool DBusConnection::connect(DBusError &dbusError, bool startDispatchThread) {
dispatchThread_ = new std::thread(std::bind(&DBusConnection::dispatch, shared_from_this()));
}
+ std::unique_ptr<CompletionHelper> & helper = CompletionHelper::get();
+ if (helper) {
+ helper->registerConnection(reinterpret_cast<std::uintptr_t>(this));
+ }
+
return true;
}
@@ -452,16 +491,21 @@ void DBusConnection::disconnect() {
dbus_connection_close(connection_);
+ std::unique_ptr<CompletionHelper> & helper = CompletionHelper::get();
+
if(dispatchThread_) {
loop_->stop();
//It is possible for the disconnect to be called from within a callback, i.e. from within the dispatch
//thread. Self-join is prevented this way.
if (dispatchThread_->joinable() && std::this_thread::get_id() != dispatchThread_->get_id()) {
dispatchThread_->join();
+ delete dispatchThread_;
+ } else if (helper) {
+ helper->joinOnExit(reinterpret_cast<std::uintptr_t>(this), *dispatchThread_);
} else {
dispatchThread_->detach();
+ delete dispatchThread_;
}
- delete dispatchThread_;
dispatchThread_ = NULL;
}
@@ -474,10 +518,17 @@ void DBusConnection::disconnect() {
if (enforcerThread_->joinable() &&
std::this_thread::get_id() != enforcerThread_->get_id()) {
enforcerThread_->join();
+ delete enforcerThread_;
+ } else if(helper) {
+ helper->joinOnExit(reinterpret_cast<std::uintptr_t>(this), *enforcerThread_);
} else {
enforcerThread_->detach();
}
+ if (helper) {
+ helper->unregisterConnection(reinterpret_cast<std::uintptr_t>(this));
+ }
+
// remote mainloop watchers
dbus_connection_set_watch_functions(connection_, NULL, NULL, NULL, NULL, NULL);
dbus_connection_set_timeout_functions(connection_, NULL, NULL, NULL, NULL, NULL);
@@ -599,7 +650,7 @@ void DBusConnection::onLibdbusPendingCall(::DBusPendingCall* _libdbusPendingCall
CallStatus callStatus = CallStatus::SUCCESS;
if (_reply.isErrorType() || !_reply.isMethodReturnType()) {
- if(strcmp(_reply.getError(), DBUS_ERROR_UNKNOWN_METHOD) == 0) {
+ if (std::string(_reply.getError()) == std::string(DBUS_ERROR_UNKNOWN_METHOD)) {
callStatus = CallStatus::NOT_AVAILABLE;
} else {
callStatus = CallStatus::REMOTE_ERROR;
@@ -810,6 +861,47 @@ void DBusConnection::enforceAsynchronousTimeouts() {
}
}
+ {
+ std::lock_guard<std::mutex> itsLock(enforceTimeoutMutex_);
+ auto it = timeoutMap_.begin();
+ while (it != timeoutMap_.end()) {
+ DBusMessageReplyAsyncHandler* asyncHandler = std::get<1>(it->second);
+
+ asyncHandler->lock();
+ bool executionStarted = asyncHandler->getExecutionStarted();
+ bool executionFinished = asyncHandler->getExecutionFinished();
+ if (executionStarted && !executionFinished) {
+ asyncHandler->setHasToBeDeleted();
+ it = timeoutMap_.erase(it);
+ asyncHandler->unlock();
+ continue;
+ }
+ asyncHandler->unlock();
+
+ it = timeoutMap_.erase(it);
+ asyncHandlersToDelete_.push_back(asyncHandler);
+ }
+
+ auto itTimeoutInf = timeoutInfiniteAsyncHandlers_.begin();
+ while (itTimeoutInf != timeoutInfiniteAsyncHandlers_.end()) {
+ DBusMessageReplyAsyncHandler* asyncHandler = (*itTimeoutInf);
+
+ asyncHandler->lock();
+ bool executionStarted = asyncHandler->getExecutionStarted();
+ bool executionFinished = asyncHandler->getExecutionFinished();
+ if (executionStarted && !executionFinished) {
+ asyncHandler->setHasToBeDeleted();
+ itTimeoutInf = timeoutInfiniteAsyncHandlers_.erase(itTimeoutInf);
+ asyncHandler->unlock();
+ continue;
+ }
+ asyncHandler->unlock();
+
+ itTimeoutInf = timeoutInfiniteAsyncHandlers_.erase(itTimeoutInf);
+ asyncHandlersToDelete_.push_back(asyncHandler);
+ }
+ }
+
// delete left async handlers that could not be deleted by the main loop
deleteAsyncHandlers();
}
@@ -1159,7 +1251,7 @@ DBusProxyConnection::DBusSignalHandlerToken DBusConnection::addSignalMemberHandl
if(signalHandlerPathToAddIt == dbusSignalHandlersToAdd_.end()) {
// is first signal member handler for this 'dbusSignalHandlerPath' --> add
- std::map<DBusSignalHandler*, std::weak_ptr<DBusSignalHandler>> handlerList;
+ std::map<const DBusSignalHandler*, std::weak_ptr<DBusSignalHandler>> handlerList;
handlerList[itsHandler.get()] = dbusSignalHandler;
dbusSignalHandlersToAdd_.insert( {
@@ -1210,7 +1302,7 @@ bool DBusConnection::removeSignalMemberHandler(const DBusSignalHandlerToken &dbu
//check if signal handler is already added
auto signalHandlerPathIt = dbusSignalHandlers_.find(dbusSignalHandlerToken);
if(signalHandlerPathIt != dbusSignalHandlers_.end() &&
- signalHandlerPathIt->second.find(const_cast<DBusSignalHandler*>(dbusSignalHandler)) !=
+ signalHandlerPathIt->second.find(dbusSignalHandler) !=
signalHandlerPathIt->second.end()) {
// signal handler is already added
@@ -1218,19 +1310,19 @@ bool DBusConnection::removeSignalMemberHandler(const DBusSignalHandlerToken &dbu
auto signalHandlerPathToRemoveIt = dbusSignalHandlersToRemove_.find(dbusSignalHandlerToken);
if(signalHandlerPathToRemoveIt != dbusSignalHandlersToRemove_.end()) {
- auto it = signalHandlerPathToRemoveIt->second.find(const_cast<DBusSignalHandler*>(dbusSignalHandler));
+ auto it = signalHandlerPathToRemoveIt->second.find(dbusSignalHandler);
if(it == signalHandlerPathToRemoveIt->second.end()) {
// handler is not going to be removed yet --> remove
- signalHandlerPathToRemoveIt->second[const_cast<DBusSignalHandler*>(dbusSignalHandler)] =
+ signalHandlerPathToRemoveIt->second[dbusSignalHandler] =
std::weak_ptr<DBusSignalHandler>();
}
} else {
// handler is not going to be removed yet. No dbus signal handler token found --> insert with handler to remove
- std::map<DBusSignalHandler*, std::weak_ptr<DBusSignalHandler>> handlerList;
- handlerList[const_cast<DBusSignalHandler*>(dbusSignalHandler)] =
+ std::map<const DBusSignalHandler*, std::weak_ptr<DBusSignalHandler>> handlerList;
+ handlerList[dbusSignalHandler] =
std::weak_ptr<DBusSignalHandler>();
dbusSignalHandlersToRemove_.insert( {
@@ -1245,7 +1337,7 @@ bool DBusConnection::removeSignalMemberHandler(const DBusSignalHandlerToken &dbu
auto signalHandlerPathToAddIt = dbusSignalHandlersToAdd_.find(dbusSignalHandlerToken);
if(signalHandlerPathToAddIt != dbusSignalHandlersToAdd_.end()) {
- auto handlersToAddEntry = signalHandlerPathToAddIt->second.find(const_cast<DBusSignalHandler*>(dbusSignalHandler));
+ auto handlersToAddEntry = signalHandlerPathToAddIt->second.find(dbusSignalHandler);
if(handlersToAddEntry != signalHandlerPathToAddIt->second.end()) {
// handler is planned to be added --> erase
@@ -1287,7 +1379,7 @@ bool DBusConnection::addObjectManagerSignalMemberHandler(const std::string& dbus
if(signalHandlerPathToAddIt == dbusOMSignalHandlersToAdd_.end()) {
// is first signal member handler --> add to list and add match rule
- std::pair<DBusSignalHandler*, std::weak_ptr<DBusSignalHandler>> handler =
+ std::pair<const DBusSignalHandler*, std::weak_ptr<DBusSignalHandler>> handler =
std::make_pair(itsHandler.get(), dbusSignalHandler);
dbusOMSignalHandlersToAdd_.insert( {
@@ -1321,7 +1413,7 @@ bool DBusConnection::addObjectManagerSignalMemberHandler(const std::string& dbus
}
bool DBusConnection::removeObjectManagerSignalMemberHandler(const std::string& dbusBusName,
- DBusSignalHandler* dbusSignalHandler) {
+ const DBusSignalHandler* dbusSignalHandler) {
if (dbusBusName.empty()) {
COMMONAPI_ERROR(std::string(__FUNCTION__), " empty dbusBusName");
return false;
@@ -1346,7 +1438,7 @@ bool DBusConnection::removeObjectManagerSignalMemberHandler(const std::string& d
}
} else {
// no dbus signal handler found for 'dbusBusName' --> insert with handler
- std::pair<DBusSignalHandler*, std::weak_ptr<DBusSignalHandler>> handler =
+ std::pair<const DBusSignalHandler*, std::weak_ptr<DBusSignalHandler>> handler =
std::make_pair(dbusSignalHandler, std::weak_ptr<DBusSignalHandler>());
dbusOMSignalHandlersToRemove_.insert( {
@@ -1651,23 +1743,23 @@ void DBusConnection::removeLibdbusSignalMatchRule(const std::string& objectPath,
if (!matchRuleFound) {
COMMONAPI_ERROR(std::string(__FUNCTION__), " no match rule found for path: ", objectPath,
"interface: ", interfaceName, " member: ", interfaceMemberName);
- }
-
- uint32_t& matchRuleReferenceCount = matchRuleIterator->second.first;
- if (matchRuleReferenceCount > 1) {
- matchRuleReferenceCount--;
- return;
- }
+ } else {
+ uint32_t& matchRuleReferenceCount = matchRuleIterator->second.first;
+ if (matchRuleReferenceCount > 1) {
+ matchRuleReferenceCount--;
+ return;
+ }
- if (isConnected()) {
- const std::string& matchRuleString = matchRuleIterator->second.second;
- const bool libdbusSuccess = removeLibdbusSignalMatchRule(matchRuleString);
- if (!libdbusSuccess) {
- COMMONAPI_ERROR(std::string(__FUNCTION__), " removeLibdbusSignalMatchRule failed ", matchRuleString);
+ if (isConnected()) {
+ const std::string& matchRuleString = matchRuleIterator->second.second;
+ const bool libdbusSuccess = removeLibdbusSignalMatchRule(matchRuleString);
+ if (!libdbusSuccess) {
+ COMMONAPI_ERROR(std::string(__FUNCTION__), " removeLibdbusSignalMatchRule failed ", matchRuleString);
+ }
}
- }
- dbusSignalMatchRulesMap_.erase(matchRuleIterator);
+ dbusSignalMatchRulesMap_.erase(matchRuleIterator);
+ }
}
void DBusConnection::initLibdbusObjectPathHandlerAfterConnect() {
@@ -1787,7 +1879,7 @@ void DBusConnection::notifyDBusSignalHandlers(DBusSignalHandlerPath handlerPath,
auto signalHandlerPathIt = dbusSignalHandlers_.find(handlerPath);
if(signalHandlerPathIt == dbusSignalHandlers_.end()) {
- std::map<DBusSignalHandler*, std::weak_ptr<DBusSignalHandler>> handlerList;
+ std::map<const DBusSignalHandler*, std::weak_ptr<DBusSignalHandler>> handlerList;
handlerList[handlerToAddIt->first] = handlerToAddIt->second;
dbusSignalHandlers_.insert( {
@@ -1803,7 +1895,7 @@ void DBusConnection::notifyDBusSignalHandlers(DBusSignalHandlerPath handlerPath,
}
// ensure, the registry survives
- std::shared_ptr<DBusServiceRegistry> itsRegistry_ = DBusServiceRegistry::get(shared_from_this());
+ std::shared_ptr<DBusServiceRegistry> itsRegistry_ = DBusServiceRegistry::get(shared_from_this(), false);
// notify
auto signalHandlerPathIt = dbusSignalHandlers_.find(handlerPath);
@@ -2041,5 +2133,50 @@ uint32_t DBusConnection::getNumberOfSignalMemberHandlers(DBusSignalHandlerPath h
return handlers - handlersToRemove + handlersToAdd;
}
+void DBusConnection::addSignalStateHandler(
+ std::shared_ptr<DBusProxyConnection::DBusSignalHandler> _handler,
+ const uint32_t _subscription) {
+ {
+ std::lock_guard<std::mutex> itsLock(signalStateHandlersMutex_);
+ signalStateHandlers_[_handler].insert(_subscription);
+ }
+ auto function = std::bind(&DBusProxyConnection::DBusSignalHandler::onSpecificError,
+ _handler, std::placeholders::_1, std::placeholders::_2);
+
+ proxyPushFunctionToMainLoop(function, CommonAPI::CallStatus::SUCCESS, _subscription);
+}
+
+void DBusConnection::removeSignalStateHandler(
+ std::shared_ptr<DBusProxyConnection::DBusSignalHandler> _handler,
+ const uint32_t _tag, bool _remove_all) {
+ {
+ std::lock_guard<std::mutex> itsLock(signalStateHandlersMutex_);
+ if (_remove_all) {
+ signalStateHandlers_.erase(_handler);
+ } else {
+ auto itsHandler = signalStateHandlers_.find(_handler);
+ if (itsHandler != signalStateHandlers_.end()) {
+ itsHandler->second.erase(_tag);
+ if (itsHandler->second.size() == 0) {
+ signalStateHandlers_.erase(_handler);
+ }
+ }
+ }
+ }
+}
+
+void DBusConnection::handleSignalStates() {
+ std::map<std::shared_ptr<DBusProxyConnection::DBusSignalHandler>, std::set<uint32_t>> tmpHandlers;
+ {
+ std::lock_guard<std::mutex> itsLock(signalStateHandlersMutex_);
+ tmpHandlers = signalStateHandlers_;
+ }
+ for (auto itsHandler : tmpHandlers) {
+ for (uint32_t tag : itsHandler.second) {
+ itsHandler.first->onSpecificError(CommonAPI::CallStatus::SUCCESS, tag);
+ }
+ }
+}
+
} // namespace DBus
} // namespace CommonAPI