diff options
Diffstat (limited to 'lib/cpp/src/thrift/concurrency/Monitor.cpp')
-rw-r--r-- | lib/cpp/src/thrift/concurrency/Monitor.cpp | 164 |
1 files changed, 76 insertions, 88 deletions
diff --git a/lib/cpp/src/thrift/concurrency/Monitor.cpp b/lib/cpp/src/thrift/concurrency/Monitor.cpp index 9570cc691..7b3b209a7 100644 --- a/lib/cpp/src/thrift/concurrency/Monitor.cpp +++ b/lib/cpp/src/thrift/concurrency/Monitor.cpp @@ -23,43 +23,36 @@ #include <thrift/concurrency/Exception.h> #include <thrift/concurrency/Util.h> #include <thrift/transport/PlatformSocket.h> -#include <memory> - #include <assert.h> -#include <iostream> - -#include <pthread.h> +#include <condition_variable> +#include <chrono> +#include <thread> +#include <mutex> namespace apache { namespace thrift { - -using std::unique_ptr; -using std::shared_ptr; - namespace concurrency { /** - * Monitor implementation using the POSIX pthread library + * Monitor implementation using the std thread library * * @version $Id:$ */ class Monitor::Impl { public: - Impl() : ownedMutex_(new Mutex()), mutex_(NULL), condInitialized_(false) { - init(ownedMutex_.get()); - } - - Impl(Mutex* mutex) : mutex_(NULL), condInitialized_(false) { init(mutex); } + Impl() : ownedMutex_(new Mutex()), conditionVariable_(), mutex_(NULL) { init(ownedMutex_.get()); } - Impl(Monitor* monitor) : mutex_(NULL), condInitialized_(false) { init(&(monitor->mutex())); } + Impl(Mutex* mutex) : ownedMutex_(), conditionVariable_(), mutex_(NULL) { init(mutex); } - ~Impl() { cleanup(); } + Impl(Monitor* monitor) : ownedMutex_(), conditionVariable_(), mutex_(NULL) { + init(&(monitor->mutex())); + } Mutex& mutex() { return *mutex_; } - void lock() { mutex().lock(); } - void unlock() { mutex().unlock(); } + void lock() { mutex_->lock(); } + void unlock() { mutex_->unlock(); } /** * Exception-throwing version of waitForTimeRelative(), called simply @@ -68,15 +61,12 @@ public: * If the condition occurs, this function returns cleanly; on timeout or * error an exception is thrown. */ - void wait(int64_t timeout_ms) const { + void wait(int64_t timeout_ms) { int result = waitForTimeRelative(timeout_ms); if (result == THRIFT_ETIMEDOUT) { - // pthread_cond_timedwait has been observed to return early on - // various platforms, so comment out this assert. - // assert(Util::currentTime() >= (now + timeout)); throw TimedOutException(); } else if (result != 0) { - throw TException("pthread_cond_wait() or pthread_cond_timedwait() failed"); + throw TException("Monitor::wait() failed"); } } @@ -86,88 +76,86 @@ public: * * Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code. */ - int waitForTimeRelative(int64_t timeout_ms) const { + int waitForTimeRelative(int64_t timeout_ms) { if (timeout_ms == 0LL) { return waitForever(); } - struct THRIFT_TIMESPEC abstime; - Util::toTimespec(abstime, Util::currentTime() + timeout_ms); - return waitForTime(&abstime); + assert(mutex_); + std::timed_mutex* mutexImpl = static_cast<std::timed_mutex*>(mutex_->getUnderlyingImpl()); + assert(mutexImpl); + + std::unique_lock<std::timed_mutex> lock(*mutexImpl, std::adopt_lock); + bool timedout = (conditionVariable_.wait_for(lock, std::chrono::milliseconds(timeout_ms)) + == std::cv_status::timeout); + lock.release(); + return (timedout ? THRIFT_ETIMEDOUT : 0); } /** * Waits until the absolute time specified using struct THRIFT_TIMESPEC. * Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code. */ - int waitForTime(const THRIFT_TIMESPEC* abstime) const { + int waitForTime(const THRIFT_TIMESPEC* abstime) { + struct timeval temp; + temp.tv_sec = static_cast<long>(abstime->tv_sec); + temp.tv_usec = static_cast<long>(abstime->tv_nsec) / 1000; + return waitForTime(&temp); + } + + /** + * Waits until the absolute time specified using struct timeval. + * Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code. + */ + int waitForTime(const struct timeval* abstime) { assert(mutex_); - pthread_mutex_t* mutexImpl = reinterpret_cast<pthread_mutex_t*>(mutex_->getUnderlyingImpl()); + std::timed_mutex* mutexImpl = static_cast<std::timed_mutex*>(mutex_->getUnderlyingImpl()); assert(mutexImpl); - // XXX Need to assert that caller owns mutex - return pthread_cond_timedwait(&pthread_cond_, mutexImpl, abstime); + struct timeval currenttime; + Util::toTimeval(currenttime, Util::currentTime()); + + long tv_sec = static_cast<long>(abstime->tv_sec - currenttime.tv_sec); + long tv_usec = static_cast<long>(abstime->tv_usec - currenttime.tv_usec); + if (tv_sec < 0) + tv_sec = 0; + if (tv_usec < 0) + tv_usec = 0; + + std::unique_lock<std::timed_mutex> lock(*mutexImpl, std::adopt_lock); + bool timedout = (conditionVariable_.wait_for(lock, + std::chrono::seconds(tv_sec) + + std::chrono::microseconds(tv_usec)) + == std::cv_status::timeout); + lock.release(); + return (timedout ? THRIFT_ETIMEDOUT : 0); } - int waitForTime(const struct timeval* abstime) const { - struct THRIFT_TIMESPEC temp; - temp.tv_sec = abstime->tv_sec; - temp.tv_nsec = abstime->tv_usec * 1000; - return waitForTime(&temp); - } /** * Waits forever until the condition occurs. * Returns 0 if condition occurs, or an error code otherwise. */ - int waitForever() const { + int waitForever() { assert(mutex_); - pthread_mutex_t* mutexImpl = reinterpret_cast<pthread_mutex_t*>(mutex_->getUnderlyingImpl()); + std::timed_mutex* mutexImpl = static_cast<std::timed_mutex*>(mutex_->getUnderlyingImpl()); assert(mutexImpl); - return pthread_cond_wait(&pthread_cond_, mutexImpl); - } - void notify() { - // XXX Need to assert that caller owns mutex - int iret = pthread_cond_signal(&pthread_cond_); - THRIFT_UNUSED_VARIABLE(iret); - assert(iret == 0); + std::unique_lock<std::timed_mutex> lock(*mutexImpl, std::adopt_lock); + conditionVariable_.wait(lock); + lock.release(); + return 0; } - void notifyAll() { - // XXX Need to assert that caller owns mutex - int iret = pthread_cond_broadcast(&pthread_cond_); - THRIFT_UNUSED_VARIABLE(iret); - assert(iret == 0); - } - -private: - void init(Mutex* mutex) { - mutex_ = mutex; + void notify() { conditionVariable_.notify_one(); } - if (pthread_cond_init(&pthread_cond_, NULL) == 0) { - condInitialized_ = true; - } + void notifyAll() { conditionVariable_.notify_all(); } - if (!condInitialized_) { - cleanup(); - throw SystemResourceException(); - } - } - - void cleanup() { - if (condInitialized_) { - condInitialized_ = false; - int iret = pthread_cond_destroy(&pthread_cond_); - THRIFT_UNUSED_VARIABLE(iret); - assert(iret == 0); - } - } +private: + void init(Mutex* mutex) { mutex_ = mutex; } - unique_ptr<Mutex> ownedMutex_; + const std::unique_ptr<Mutex> ownedMutex_; + std::condition_variable_any conditionVariable_; Mutex* mutex_; - - mutable pthread_cond_t pthread_cond_; - mutable bool condInitialized_; }; Monitor::Monitor() : impl_(new Monitor::Impl()) { @@ -182,43 +170,43 @@ Monitor::~Monitor() { } Mutex& Monitor::mutex() const { - return impl_->mutex(); + return const_cast<Monitor::Impl*>(impl_)->mutex(); } void Monitor::lock() const { - impl_->lock(); + const_cast<Monitor::Impl*>(impl_)->lock(); } void Monitor::unlock() const { - impl_->unlock(); + const_cast<Monitor::Impl*>(impl_)->unlock(); } void Monitor::wait(int64_t timeout) const { - impl_->wait(timeout); + const_cast<Monitor::Impl*>(impl_)->wait(timeout); } int Monitor::waitForTime(const THRIFT_TIMESPEC* abstime) const { - return impl_->waitForTime(abstime); + return const_cast<Monitor::Impl*>(impl_)->waitForTime(abstime); } int Monitor::waitForTime(const timeval* abstime) const { - return impl_->waitForTime(abstime); + return const_cast<Monitor::Impl*>(impl_)->waitForTime(abstime); } int Monitor::waitForTimeRelative(int64_t timeout_ms) const { - return impl_->waitForTimeRelative(timeout_ms); + return const_cast<Monitor::Impl*>(impl_)->waitForTimeRelative(timeout_ms); } int Monitor::waitForever() const { - return impl_->waitForever(); + return const_cast<Monitor::Impl*>(impl_)->waitForever(); } void Monitor::notify() const { - impl_->notify(); + const_cast<Monitor::Impl*>(impl_)->notify(); } void Monitor::notifyAll() const { - impl_->notifyAll(); + const_cast<Monitor::Impl*>(impl_)->notifyAll(); } } } |