summaryrefslogtreecommitdiff
path: root/lib/cpp/src/thrift/concurrency/Monitor.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'lib/cpp/src/thrift/concurrency/Monitor.cpp')
-rw-r--r--lib/cpp/src/thrift/concurrency/Monitor.cpp164
1 files changed, 76 insertions, 88 deletions
diff --git a/lib/cpp/src/thrift/concurrency/Monitor.cpp b/lib/cpp/src/thrift/concurrency/Monitor.cpp
index 9570cc691..7b3b209a7 100644
--- a/lib/cpp/src/thrift/concurrency/Monitor.cpp
+++ b/lib/cpp/src/thrift/concurrency/Monitor.cpp
@@ -23,43 +23,36 @@
#include <thrift/concurrency/Exception.h>
#include <thrift/concurrency/Util.h>
#include <thrift/transport/PlatformSocket.h>
-#include <memory>
-
#include <assert.h>
-#include <iostream>
-
-#include <pthread.h>
+#include <condition_variable>
+#include <chrono>
+#include <thread>
+#include <mutex>
namespace apache {
namespace thrift {
-
-using std::unique_ptr;
-using std::shared_ptr;
-
namespace concurrency {
/**
- * Monitor implementation using the POSIX pthread library
+ * Monitor implementation using the std thread library
*
* @version $Id:$
*/
class Monitor::Impl {
public:
- Impl() : ownedMutex_(new Mutex()), mutex_(NULL), condInitialized_(false) {
- init(ownedMutex_.get());
- }
-
- Impl(Mutex* mutex) : mutex_(NULL), condInitialized_(false) { init(mutex); }
+ Impl() : ownedMutex_(new Mutex()), conditionVariable_(), mutex_(NULL) { init(ownedMutex_.get()); }
- Impl(Monitor* monitor) : mutex_(NULL), condInitialized_(false) { init(&(monitor->mutex())); }
+ Impl(Mutex* mutex) : ownedMutex_(), conditionVariable_(), mutex_(NULL) { init(mutex); }
- ~Impl() { cleanup(); }
+ Impl(Monitor* monitor) : ownedMutex_(), conditionVariable_(), mutex_(NULL) {
+ init(&(monitor->mutex()));
+ }
Mutex& mutex() { return *mutex_; }
- void lock() { mutex().lock(); }
- void unlock() { mutex().unlock(); }
+ void lock() { mutex_->lock(); }
+ void unlock() { mutex_->unlock(); }
/**
* Exception-throwing version of waitForTimeRelative(), called simply
@@ -68,15 +61,12 @@ public:
* If the condition occurs, this function returns cleanly; on timeout or
* error an exception is thrown.
*/
- void wait(int64_t timeout_ms) const {
+ void wait(int64_t timeout_ms) {
int result = waitForTimeRelative(timeout_ms);
if (result == THRIFT_ETIMEDOUT) {
- // pthread_cond_timedwait has been observed to return early on
- // various platforms, so comment out this assert.
- // assert(Util::currentTime() >= (now + timeout));
throw TimedOutException();
} else if (result != 0) {
- throw TException("pthread_cond_wait() or pthread_cond_timedwait() failed");
+ throw TException("Monitor::wait() failed");
}
}
@@ -86,88 +76,86 @@ public:
*
* Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code.
*/
- int waitForTimeRelative(int64_t timeout_ms) const {
+ int waitForTimeRelative(int64_t timeout_ms) {
if (timeout_ms == 0LL) {
return waitForever();
}
- struct THRIFT_TIMESPEC abstime;
- Util::toTimespec(abstime, Util::currentTime() + timeout_ms);
- return waitForTime(&abstime);
+ assert(mutex_);
+ std::timed_mutex* mutexImpl = static_cast<std::timed_mutex*>(mutex_->getUnderlyingImpl());
+ assert(mutexImpl);
+
+ std::unique_lock<std::timed_mutex> lock(*mutexImpl, std::adopt_lock);
+ bool timedout = (conditionVariable_.wait_for(lock, std::chrono::milliseconds(timeout_ms))
+ == std::cv_status::timeout);
+ lock.release();
+ return (timedout ? THRIFT_ETIMEDOUT : 0);
}
/**
* Waits until the absolute time specified using struct THRIFT_TIMESPEC.
* Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code.
*/
- int waitForTime(const THRIFT_TIMESPEC* abstime) const {
+ int waitForTime(const THRIFT_TIMESPEC* abstime) {
+ struct timeval temp;
+ temp.tv_sec = static_cast<long>(abstime->tv_sec);
+ temp.tv_usec = static_cast<long>(abstime->tv_nsec) / 1000;
+ return waitForTime(&temp);
+ }
+
+ /**
+ * Waits until the absolute time specified using struct timeval.
+ * Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code.
+ */
+ int waitForTime(const struct timeval* abstime) {
assert(mutex_);
- pthread_mutex_t* mutexImpl = reinterpret_cast<pthread_mutex_t*>(mutex_->getUnderlyingImpl());
+ std::timed_mutex* mutexImpl = static_cast<std::timed_mutex*>(mutex_->getUnderlyingImpl());
assert(mutexImpl);
- // XXX Need to assert that caller owns mutex
- return pthread_cond_timedwait(&pthread_cond_, mutexImpl, abstime);
+ struct timeval currenttime;
+ Util::toTimeval(currenttime, Util::currentTime());
+
+ long tv_sec = static_cast<long>(abstime->tv_sec - currenttime.tv_sec);
+ long tv_usec = static_cast<long>(abstime->tv_usec - currenttime.tv_usec);
+ if (tv_sec < 0)
+ tv_sec = 0;
+ if (tv_usec < 0)
+ tv_usec = 0;
+
+ std::unique_lock<std::timed_mutex> lock(*mutexImpl, std::adopt_lock);
+ bool timedout = (conditionVariable_.wait_for(lock,
+ std::chrono::seconds(tv_sec)
+ + std::chrono::microseconds(tv_usec))
+ == std::cv_status::timeout);
+ lock.release();
+ return (timedout ? THRIFT_ETIMEDOUT : 0);
}
- int waitForTime(const struct timeval* abstime) const {
- struct THRIFT_TIMESPEC temp;
- temp.tv_sec = abstime->tv_sec;
- temp.tv_nsec = abstime->tv_usec * 1000;
- return waitForTime(&temp);
- }
/**
* Waits forever until the condition occurs.
* Returns 0 if condition occurs, or an error code otherwise.
*/
- int waitForever() const {
+ int waitForever() {
assert(mutex_);
- pthread_mutex_t* mutexImpl = reinterpret_cast<pthread_mutex_t*>(mutex_->getUnderlyingImpl());
+ std::timed_mutex* mutexImpl = static_cast<std::timed_mutex*>(mutex_->getUnderlyingImpl());
assert(mutexImpl);
- return pthread_cond_wait(&pthread_cond_, mutexImpl);
- }
- void notify() {
- // XXX Need to assert that caller owns mutex
- int iret = pthread_cond_signal(&pthread_cond_);
- THRIFT_UNUSED_VARIABLE(iret);
- assert(iret == 0);
+ std::unique_lock<std::timed_mutex> lock(*mutexImpl, std::adopt_lock);
+ conditionVariable_.wait(lock);
+ lock.release();
+ return 0;
}
- void notifyAll() {
- // XXX Need to assert that caller owns mutex
- int iret = pthread_cond_broadcast(&pthread_cond_);
- THRIFT_UNUSED_VARIABLE(iret);
- assert(iret == 0);
- }
-
-private:
- void init(Mutex* mutex) {
- mutex_ = mutex;
+ void notify() { conditionVariable_.notify_one(); }
- if (pthread_cond_init(&pthread_cond_, NULL) == 0) {
- condInitialized_ = true;
- }
+ void notifyAll() { conditionVariable_.notify_all(); }
- if (!condInitialized_) {
- cleanup();
- throw SystemResourceException();
- }
- }
-
- void cleanup() {
- if (condInitialized_) {
- condInitialized_ = false;
- int iret = pthread_cond_destroy(&pthread_cond_);
- THRIFT_UNUSED_VARIABLE(iret);
- assert(iret == 0);
- }
- }
+private:
+ void init(Mutex* mutex) { mutex_ = mutex; }
- unique_ptr<Mutex> ownedMutex_;
+ const std::unique_ptr<Mutex> ownedMutex_;
+ std::condition_variable_any conditionVariable_;
Mutex* mutex_;
-
- mutable pthread_cond_t pthread_cond_;
- mutable bool condInitialized_;
};
Monitor::Monitor() : impl_(new Monitor::Impl()) {
@@ -182,43 +170,43 @@ Monitor::~Monitor() {
}
Mutex& Monitor::mutex() const {
- return impl_->mutex();
+ return const_cast<Monitor::Impl*>(impl_)->mutex();
}
void Monitor::lock() const {
- impl_->lock();
+ const_cast<Monitor::Impl*>(impl_)->lock();
}
void Monitor::unlock() const {
- impl_->unlock();
+ const_cast<Monitor::Impl*>(impl_)->unlock();
}
void Monitor::wait(int64_t timeout) const {
- impl_->wait(timeout);
+ const_cast<Monitor::Impl*>(impl_)->wait(timeout);
}
int Monitor::waitForTime(const THRIFT_TIMESPEC* abstime) const {
- return impl_->waitForTime(abstime);
+ return const_cast<Monitor::Impl*>(impl_)->waitForTime(abstime);
}
int Monitor::waitForTime(const timeval* abstime) const {
- return impl_->waitForTime(abstime);
+ return const_cast<Monitor::Impl*>(impl_)->waitForTime(abstime);
}
int Monitor::waitForTimeRelative(int64_t timeout_ms) const {
- return impl_->waitForTimeRelative(timeout_ms);
+ return const_cast<Monitor::Impl*>(impl_)->waitForTimeRelative(timeout_ms);
}
int Monitor::waitForever() const {
- return impl_->waitForever();
+ return const_cast<Monitor::Impl*>(impl_)->waitForever();
}
void Monitor::notify() const {
- impl_->notify();
+ const_cast<Monitor::Impl*>(impl_)->notify();
}
void Monitor::notifyAll() const {
- impl_->notifyAll();
+ const_cast<Monitor::Impl*>(impl_)->notifyAll();
}
}
}