summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPhilip Rauwolf <rauwolf@itestra.de>2013-07-05 17:18:35 +0200
committerPhilip Rauwolf <rauwolf@itestra.de>2013-07-05 17:18:35 +0200
commitf4c3113bb55041ac43a6fcd2287e0d63a4abf6ad (patch)
treea856b37f09456df5963f996de6d80821d587e29c
parentffe6b330c356c713dc2ca3dbb7ef67eeeb3ed7b4 (diff)
downloadgenivi-common-api-dbus-runtime-f4c3113bb55041ac43a6fcd2287e0d63a4abf6ad.tar.gz
Improved DBusConnection lifetime and timeout handling for async calls.
Problem was that if some calls never returned, they indirectly kept a shared_ptr to the Connection for the rest of the program lifetime. The current workaround in DBusServiceRegistry handled the issue from application perspective, but did not destroy the libdbus pending calls. Change-Id: Iacabacec8367924bacf9698211e3f2b63c804dd5
-rw-r--r--src/CommonAPI/DBus/DBusConnection.cpp126
-rw-r--r--src/CommonAPI/DBus/DBusConnection.h12
-rw-r--r--src/CommonAPI/DBus/DBusDaemonProxy.cpp9
-rw-r--r--src/CommonAPI/DBus/DBusMessage.cpp8
-rw-r--r--src/CommonAPI/DBus/DBusMessage.h2
-rw-r--r--src/CommonAPI/DBus/DBusServiceRegistry.cpp70
-rw-r--r--src/CommonAPI/DBus/DBusServiceRegistry.h1
7 files changed, 169 insertions, 59 deletions
diff --git a/src/CommonAPI/DBus/DBusConnection.cpp b/src/CommonAPI/DBus/DBusConnection.cpp
index a252eab..950c72a 100644
--- a/src/CommonAPI/DBus/DBusConnection.cpp
+++ b/src/CommonAPI/DBus/DBusConnection.cpp
@@ -4,12 +4,21 @@
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+// Workaround for libstdc++ bug
+#ifndef _GLIBCXX_USE_NANOSLEEP
+#define _GLIBCXX_USE_NANOSLEEP
+#endif
+
#include "DBusConnection.h"
#include "DBusInputStream.h"
+#include <algorithm>
#include <sstream>
#include <cassert>
#include <future>
+#include <chrono>
+#include <thread>
namespace CommonAPI {
namespace DBus {
@@ -29,15 +38,20 @@ DBusObjectPathVTable DBusConnection::libdbusObjectPathVTable_ = {
&DBusConnection::onLibdbusObjectPathMessageThunk
};
-void DBusConnection::dispatch(std::shared_ptr<DBusConnection>* selfReference) {
- while (!stopDispatching_ && readWriteDispatch(10) && !selfReference->unique()) {
- if(pauseDispatching_) {
+
+//std::bind used to start the dispatch thread holds one reference, and the selfReference
+//created within the thread is the second. If only those two remain, no one but the
+//dispatch thread references the connection, which therefore can be finished.
+constexpr uint32_t ownUseCount = 2;
+
+void DBusConnection::dispatch() {
+ std::shared_ptr<DBusConnection> selfReference = this->shared_from_this();
+ while (!stopDispatching_ && readWriteDispatch(10) && selfReference.use_count() > ownUseCount) {
+ if (pauseDispatching_) {
dispatchSuspendLock_.lock();
dispatchSuspendLock_.unlock();
}
}
- delete selfReference;
-
}
bool DBusConnection::readWriteDispatch(int timeoutMilliseconds) {
@@ -68,7 +82,10 @@ DBusConnection::DBusConnection(BusType busType) :
dispatchThread_(NULL),
dbusObjectMessageHandler_(),
watchContext_(NULL),
- connectionNameCount_() {
+ connectionNameCount_(),
+ dispatchSource_(),
+ mainLoopContext_(std::shared_ptr<MainLoopContext>(NULL)),
+ enforcerThread(NULL) {
dbus_threads_init_default();
}
@@ -81,7 +98,10 @@ DBusConnection::DBusConnection(::DBusConnection* libDbusConnection) :
dispatchThread_(NULL),
dbusObjectMessageHandler_(),
watchContext_(NULL),
- connectionNameCount_() {
+ connectionNameCount_(),
+ dispatchSource_(),
+ mainLoopContext_(std::shared_ptr<MainLoopContext>(NULL)),
+ enforcerThread(NULL) {
dbus_threads_init_default();
}
@@ -262,9 +282,8 @@ bool DBusConnection::connect(DBusError& dbusError, bool startDispatchThread) {
initLibdbusSignalFilterAfterConnect();
- if(startDispatchThread) {
- std::shared_ptr<DBusConnection>* ptr = new std::shared_ptr<DBusConnection>(this->shared_from_this());
- dispatchThread_ = new std::thread(&DBusConnection::dispatch, this, ptr);
+ if (startDispatchThread) {
+ dispatchThread_ = new std::thread(std::bind(&DBusConnection::dispatch, this->shared_from_this()));
}
stopDispatching_ = !startDispatchThread;
@@ -427,10 +446,59 @@ void DBusConnection::onLibdbusDataCleanup(void* userData) {
delete dbusMessageReplyAsyncHandler;
}
+
+//Would not be needed if libdbus would actually handle its timeouts for pending calls.
+void DBusConnection::enforceAsynchronousTimeouts() const {
+ enforeTimeoutMutex.lock();
+
+ while (!timeoutMap.empty()) {
+ auto minTimeoutElement = std::min_element(timeoutMap.begin(), timeoutMap.end(),
+ [] (const TimeoutMapElement& lhs, const TimeoutMapElement& rhs) {
+ return std::get<0>(lhs.second) < std::get<0>(rhs.second);
+ });
+
+ int minTimeout = std::get<0>(minTimeoutElement->second);
+
+ enforeTimeoutMutex.unlock();
+
+ std::this_thread::sleep_for(std::chrono::milliseconds(minTimeout));
+
+ enforeTimeoutMutex.lock();
+
+ for (auto it = timeoutMap.begin(); it != timeoutMap.end(); ) {
+ int& currentTimeout = std::get<0>(it->second);
+ currentTimeout -= minTimeout;
+ if (currentTimeout <= 0) {
+ DBusPendingCall* libdbusPendingCall = it->first;
+
+ if (!dbus_pending_call_get_completed(libdbusPendingCall)) {
+ dbus_pending_call_cancel(libdbusPendingCall);
+ DBusMessageReplyAsyncHandler* asyncHandler = std::get<1>(it->second);
+ DBusMessage& dbusMessageCall = std::get<2>(it->second);
+ asyncHandler->onDBusMessageReply(CallStatus::REMOTE_ERROR, dbusMessageCall.createMethodError(DBUS_ERROR_TIMEOUT));
+ delete asyncHandler;
+ }
+ dbus_pending_call_unref(libdbusPendingCall);
+ it = timeoutMap.erase(it);
+ } else {
+ ++it;
+ }
+ }
+ }
+
+ //Must be atomic with respect to local threading
+ auto threadPtr = enforcerThread;
+ enforcerThread = NULL;
+ enforeTimeoutMutex.unlock();
+
+ threadPtr->detach();
+ delete threadPtr;
+}
+
std::future<CallStatus> DBusConnection::sendDBusMessageWithReplyAsync(
- const DBusMessage& dbusMessage,
- std::unique_ptr<DBusMessageReplyAsyncHandler> dbusMessageReplyAsyncHandler,
- int timeoutMilliseconds) const {
+ const DBusMessage& dbusMessage,
+ std::unique_ptr<DBusMessageReplyAsyncHandler> dbusMessageReplyAsyncHandler,
+ int timeoutMilliseconds) const {
assert(dbusMessage);
assert(isConnected());
@@ -438,14 +506,13 @@ std::future<CallStatus> DBusConnection::sendDBusMessageWithReplyAsync(
DBusPendingCall* libdbusPendingCall;
dbus_bool_t libdbusSuccess;
- libdbusSuccess = dbus_connection_send_with_reply(
- libdbusConnection_,
- dbusMessage.libdbusMessage_,
- &libdbusPendingCall,
- timeoutMilliseconds);
+ libdbusSuccess = dbus_connection_send_with_reply(libdbusConnection_,
+ dbusMessage.libdbusMessage_,
+ &libdbusPendingCall,
+ timeoutMilliseconds);
if (!libdbusSuccess || !libdbusPendingCall) {
- dbusMessageReplyAsyncHandler->onDBusMessageReply(CallStatus::CONNECTION_FAILED, dbusMessage);
+ dbusMessageReplyAsyncHandler->onDBusMessageReply(CallStatus::CONNECTION_FAILED, dbusMessage.createMethodError(DBUS_ERROR_DISCONNECTED));
return dbusMessageReplyAsyncHandler->getFuture();
}
@@ -456,20 +523,33 @@ std::future<CallStatus> DBusConnection::sendDBusMessageWithReplyAsync(
onLibdbusDataCleanup);
if (!libdbusSuccess) {
- dbusMessageReplyAsyncHandler->onDBusMessageReply(CallStatus::OUT_OF_MEMORY, dbusMessage);
+ dbusMessageReplyAsyncHandler->onDBusMessageReply(CallStatus::OUT_OF_MEMORY, dbusMessage);
dbus_pending_call_unref(libdbusPendingCall);
return dbusMessageReplyAsyncHandler->getFuture();
}
- return dbusMessageReplyAsyncHandler.release()->getFuture();
+ DBusMessageReplyAsyncHandler* replyAsyncHandler = dbusMessageReplyAsyncHandler.release();
+
+ const bool mainloopContextIsPresent = (bool) mainLoopContext_.lock();
+ if (!mainloopContextIsPresent && timeoutMilliseconds != DBUS_TIMEOUT_INFINITE) {
+ dbus_pending_call_ref(libdbusPendingCall);
+ std::tuple<int, DBusMessageReplyAsyncHandler*, DBusMessage> toInsert {timeoutMilliseconds, replyAsyncHandler, dbusMessage};
+
+ enforeTimeoutMutex.lock();
+ timeoutMap.insert( {libdbusPendingCall, toInsert } );
+ if (!enforcerThread) {
+ enforcerThread = new std::thread(std::bind(&DBusConnection::enforceAsynchronousTimeouts, this->shared_from_this()));
+ }
+ enforeTimeoutMutex.unlock();
+ }
+
+ return replyAsyncHandler->getFuture();
}
DBusMessage DBusConnection::sendDBusMessageWithReplyAndBlock(const DBusMessage& dbusMessage,
DBusError& dbusError,
int timeoutMilliseconds) const {
- auto selfReference = this->shared_from_this();
-
assert(dbusMessage);
assert(!dbusError);
assert(isConnected());
diff --git a/src/CommonAPI/DBus/DBusConnection.h b/src/CommonAPI/DBus/DBusConnection.h
index 68e8937..2200cd2 100644
--- a/src/CommonAPI/DBus/DBusConnection.h
+++ b/src/CommonAPI/DBus/DBusConnection.h
@@ -49,8 +49,6 @@ struct WatchContext {
class DBusConnection: public DBusProxyConnection, public std::enable_shared_from_this<DBusConnection> {
public:
-
-
DBusConnection(BusType busType);
inline static std::shared_ptr<DBusConnection> getBus(const BusType& busType);
@@ -116,8 +114,7 @@ class DBusConnection: public DBusProxyConnection, public std::enable_shared_from
bool singleDispatch();
private:
- void dispatch(std::shared_ptr<DBusConnection>* selfReference);
- //void dispatch();
+ void dispatch();
void suspendDispatching() const;
void resumeDispatching() const;
@@ -166,6 +163,8 @@ class DBusConnection: public DBusProxyConnection, public std::enable_shared_from
static void onWakeupMainContext(void* data);
+ void enforceAsynchronousTimeouts() const;
+
::DBusConnection* libdbusConnection_;
mutable std::mutex libdbusConnectionGuard_;
std::mutex signalGuard_;
@@ -195,6 +194,11 @@ class DBusConnection: public DBusProxyConnection, public std::enable_shared_from
DBusObjectPathMessageHandler dbusObjectMessageHandler_;
mutable std::unordered_map<std::string, uint16_t> connectionNameCount_;
+
+ typedef std::pair<DBusPendingCall*, std::tuple<int, DBusMessageReplyAsyncHandler*, DBusMessage> > TimeoutMapElement;
+ mutable std::map<DBusPendingCall*, std::tuple<int, DBusMessageReplyAsyncHandler*, DBusMessage>> timeoutMap;
+ mutable std::thread* enforcerThread;
+ mutable std::mutex enforeTimeoutMutex;
};
std::shared_ptr<DBusConnection> DBusConnection::getBus(const BusType& busType) {
diff --git a/src/CommonAPI/DBus/DBusDaemonProxy.cpp b/src/CommonAPI/DBus/DBusDaemonProxy.cpp
index fe81995..1401125 100644
--- a/src/CommonAPI/DBus/DBusDaemonProxy.cpp
+++ b/src/CommonAPI/DBus/DBusDaemonProxy.cpp
@@ -107,7 +107,8 @@ std::future<CallStatus> DBusDaemonProxy::listNamesAsync(ListNamesAsyncCallback l
return getDBusConnection()->sendDBusMessageWithReplyAsync(
dbusMessage,
- DBusProxyAsyncCallbackHandler<std::vector<std::string>>::create(listNamesAsyncCallback));
+ DBusProxyAsyncCallbackHandler<std::vector<std::string>>::create(listNamesAsyncCallback),
+ 2000);
}
void DBusDaemonProxy::nameHasOwner(const std::string& busName, CommonAPI::CallStatus& callStatus, bool& hasOwner) const {
@@ -153,7 +154,8 @@ std::future<CallStatus> DBusDaemonProxy::nameHasOwnerAsync(const std::string& bu
return getDBusConnection()->sendDBusMessageWithReplyAsync(
dbusMessage,
- DBusProxyAsyncCallbackHandler<bool>::create(nameHasOwnerAsyncCallback));
+ DBusProxyAsyncCallbackHandler<bool>::create(nameHasOwnerAsyncCallback),
+ 2000);
}
std::future<CallStatus> DBusDaemonProxy::getManagedObjectsAsync(const std::string& forDBusServiceName, GetManagedObjectsAsyncCallback callback) const {
@@ -167,7 +169,8 @@ std::future<CallStatus> DBusDaemonProxy::getManagedObjectsAsync(const std::strin
return getDBusConnection()->sendDBusMessageWithReplyAsync(
dbusMethodCallMessage,
- DBusProxyAsyncCallbackHandler<DBusObjectToInterfaceDict>::create(callback));
+ DBusProxyAsyncCallbackHandler<DBusObjectToInterfaceDict>::create(callback),
+ 2000);
}
diff --git a/src/CommonAPI/DBus/DBusMessage.cpp b/src/CommonAPI/DBus/DBusMessage.cpp
index fc99b90..110584d 100644
--- a/src/CommonAPI/DBus/DBusMessage.cpp
+++ b/src/CommonAPI/DBus/DBusMessage.cpp
@@ -139,6 +139,14 @@ DBusMessage DBusMessage::createMethodReturn(const std::string& signature) const
return createMethodReturn(signature.empty() ? NULL : signature.c_str());
}
+DBusMessage DBusMessage::createMethodError(const std::string& name, const std::string& reason) const {
+ ::DBusMessage* libdbusMessageError = dbus_message_new_error(libdbusMessage_, name.c_str(), reason.c_str());
+ assert(libdbusMessageError);
+
+ const bool increaseLibdbusMessageReferenceCount = false;
+ return DBusMessage(libdbusMessageError, increaseLibdbusMessageReferenceCount);
+}
+
DBusMessage DBusMessage::createSignal(const char* objectPath,
const char* interfaceName,
const char* signalName,
diff --git a/src/CommonAPI/DBus/DBusMessage.h b/src/CommonAPI/DBus/DBusMessage.h
index 0a77bde..839ff64 100644
--- a/src/CommonAPI/DBus/DBusMessage.h
+++ b/src/CommonAPI/DBus/DBusMessage.h
@@ -57,6 +57,8 @@ class DBusMessage {
DBusMessage createMethodReturn(const std::string& signature) const;
+ DBusMessage createMethodError(const std::string& name, const std::string& reason = "") const;
+
static DBusMessage createSignal(const char* objectPath,
const char* interfaceName,
const char* signalName,
diff --git a/src/CommonAPI/DBus/DBusServiceRegistry.cpp b/src/CommonAPI/DBus/DBusServiceRegistry.cpp
index ed2fa52..0957485 100644
--- a/src/CommonAPI/DBus/DBusServiceRegistry.cpp
+++ b/src/CommonAPI/DBus/DBusServiceRegistry.cpp
@@ -134,35 +134,20 @@ std::vector<std::string> DBusServiceRegistry::getAvailableServiceInstances(const
return availableServiceInstances;
}
- while (timeout.count() > 0) {
- size_t dbusServiceResolvingCount = getResolvedServiceInstances(interfaceName, availableServiceInstances);
+ size_t dbusServiceResolvingCount = getResolvedServiceInstances(interfaceName, availableServiceInstances);
- if (!dbusServiceResolvingCount) {
- break;
- }
-
- // wait for unknown and acquiring services, then restart from the beginning
- typedef std::chrono::high_resolution_clock clock;
- clock::time_point startTimePoint = clock::now();
+ if (!dbusServiceResolvingCount) {
+ return availableServiceInstances;
+ }
- size_t wakeupCount = 0;
- dbusServiceChanged_.wait_for(
- dbusServicesLock,
- timeout,
- [&] {
- wakeupCount++;
- return wakeupCount > dbusServiceResolvingCount;
- });
+ dbusServiceChanged_.wait(
+ dbusServicesLock,
+ [&] {
+ return getNumResolvingServiceInstances() == 0;
+ });
- if (wakeupCount > 1) {
- getResolvedServiceInstances(interfaceName, availableServiceInstances);
- break;
- }
+ getResolvedServiceInstances(interfaceName, availableServiceInstances);
- std::chrono::milliseconds elapsedWaitTime =
- std::chrono::duration_cast<std::chrono::milliseconds>(clock::now() - startTimePoint);
- timeout -= elapsedWaitTime;
- }
// maybe partial list but it contains everything we know for now
return availableServiceInstances;
@@ -182,12 +167,11 @@ void DBusServiceRegistry::getAvailableServiceInstancesAsync(Factory::GetAvailabl
size_t stillResolvingCount = getResolvedServiceInstances(interfaceName, availableServiceInstances);
- if(stillResolvingCount == 0 && !dbusServices_.empty()) {
+ if (stillResolvingCount == 0 && !dbusServices_.empty()) {
callback(availableServiceInstances);
} else {
- //This is a necessary hack, because libdbus never returns from async calls if a
- //service handles it's answers the wrong way. Here an artificial timeout is
- //added to circumvent this limitation.
+ //Necessary as service discovery might need some time, but the async version of "getAvailableServiceInstances"
+ //shall return without delay.
std::thread(
[this, callback, interfaceName, domainName](std::shared_ptr<DBusServiceRegistry> selfRef) {
auto availableServiceInstances = getAvailableServiceInstances(interfaceName, domainName);
@@ -198,6 +182,34 @@ void DBusServiceRegistry::getAvailableServiceInstancesAsync(Factory::GetAvailabl
}
+size_t DBusServiceRegistry::getNumResolvingServiceInstances() {
+ size_t dbusServicesResolvingCount = 0;
+
+ // caller must hold lock
+ auto dbusServiceIterator = dbusServices_.begin();
+ while (dbusServiceIterator != dbusServices_.end()) {
+ DBusServiceState& dbusServiceState = dbusServiceIterator->second.first;
+
+ switch (dbusServiceState) {
+ case DBusServiceState::AVAILABLE:
+ dbusServicesResolvingCount++;
+ break;
+
+ case DBusServiceState::RESOLVING:
+ dbusServicesResolvingCount++;
+ break;
+
+ default:
+ break;
+ }
+
+ dbusServiceIterator++;
+ }
+
+ return dbusServicesResolvingCount;
+}
+
+
size_t DBusServiceRegistry::getResolvedServiceInstances(const std::string& dbusInterfaceName, std::vector<std::string>& availableServiceInstances) {
size_t dbusServicesResolvingCount = 0;
diff --git a/src/CommonAPI/DBus/DBusServiceRegistry.h b/src/CommonAPI/DBus/DBusServiceRegistry.h
index 28dd6a1..fa40b9e 100644
--- a/src/CommonAPI/DBus/DBusServiceRegistry.h
+++ b/src/CommonAPI/DBus/DBusServiceRegistry.h
@@ -100,6 +100,7 @@ class DBusServiceRegistry: public std::enable_shared_from_this<DBusServiceRegist
void onGetManagedObjectsCallback(const CallStatus& status, DBusDaemonProxy::DBusObjectToInterfaceDict managedObjects, const std::string& dbusServiceName);
size_t getResolvedServiceInstances(const std::string& dbusInterfaceName, std::vector<std::string>& availableServiceInstances);
+ size_t getNumResolvingServiceInstances();
bool waitDBusServicesAvailable(std::unique_lock<std::mutex>& lock, std::chrono::milliseconds& timeout);