summaryrefslogtreecommitdiff
path: root/lib/cpp/src/thrift
diff options
context:
space:
mode:
authorcyy <cyyever@outlook.com>2019-01-11 22:13:12 +0800
committerJames E. King III <jking@apache.org>2019-01-11 09:13:12 -0500
commitca8af9b48f9e62edc57c4a233c6377144214ef5a (patch)
tree1ad3761595642b938c384b459666b89be68821cf /lib/cpp/src/thrift
parentc9ac8d2953a4806cc93aa4ff2e700623ecad980c (diff)
downloadthrift-ca8af9b48f9e62edc57c4a233c6377144214ef5a.tar.gz
THRIFT-4730: remove pthread code and refactor, ending up with just ThreadFactory
Diffstat (limited to 'lib/cpp/src/thrift')
-rw-r--r--lib/cpp/src/thrift/concurrency/Monitor.cpp164
-rw-r--r--lib/cpp/src/thrift/concurrency/Mutex.cpp335
-rw-r--r--lib/cpp/src/thrift/concurrency/Mutex.h101
-rw-r--r--lib/cpp/src/thrift/concurrency/PosixThreadFactory.cpp335
-rw-r--r--lib/cpp/src/thrift/concurrency/PosixThreadFactory.h129
-rw-r--r--lib/cpp/src/thrift/concurrency/StdMonitor.cpp213
-rw-r--r--lib/cpp/src/thrift/concurrency/StdThreadFactory.cpp153
-rw-r--r--lib/cpp/src/thrift/concurrency/Thread.cpp (renamed from lib/cpp/src/thrift/concurrency/StdMutex.cpp)48
-rw-r--r--lib/cpp/src/thrift/concurrency/Thread.h132
-rw-r--r--lib/cpp/src/thrift/concurrency/ThreadFactory.cpp (renamed from lib/cpp/src/thrift/concurrency/PlatformThreadFactory.h)30
-rw-r--r--lib/cpp/src/thrift/concurrency/ThreadFactory.h (renamed from lib/cpp/src/thrift/concurrency/StdThreadFactory.h)46
-rw-r--r--lib/cpp/src/thrift/concurrency/ThreadManager.h2
-rw-r--r--lib/cpp/src/thrift/concurrency/TimerManager.h2
-rw-r--r--lib/cpp/src/thrift/server/TNonblockingServer.cpp9
-rw-r--r--lib/cpp/src/thrift/server/TNonblockingServer.h10
-rw-r--r--lib/cpp/src/thrift/server/TThreadedServer.cpp2
-rw-r--r--lib/cpp/src/thrift/server/TThreadedServer.h10
-rw-r--r--lib/cpp/src/thrift/transport/TFileTransport.h4
-rw-r--r--lib/cpp/src/thrift/windows/config.h5
19 files changed, 222 insertions, 1508 deletions
diff --git a/lib/cpp/src/thrift/concurrency/Monitor.cpp b/lib/cpp/src/thrift/concurrency/Monitor.cpp
index 9570cc691..7b3b209a7 100644
--- a/lib/cpp/src/thrift/concurrency/Monitor.cpp
+++ b/lib/cpp/src/thrift/concurrency/Monitor.cpp
@@ -23,43 +23,36 @@
#include <thrift/concurrency/Exception.h>
#include <thrift/concurrency/Util.h>
#include <thrift/transport/PlatformSocket.h>
-#include <memory>
-
#include <assert.h>
-#include <iostream>
-
-#include <pthread.h>
+#include <condition_variable>
+#include <chrono>
+#include <thread>
+#include <mutex>
namespace apache {
namespace thrift {
-
-using std::unique_ptr;
-using std::shared_ptr;
-
namespace concurrency {
/**
- * Monitor implementation using the POSIX pthread library
+ * Monitor implementation using the std thread library
*
* @version $Id:$
*/
class Monitor::Impl {
public:
- Impl() : ownedMutex_(new Mutex()), mutex_(NULL), condInitialized_(false) {
- init(ownedMutex_.get());
- }
-
- Impl(Mutex* mutex) : mutex_(NULL), condInitialized_(false) { init(mutex); }
+ Impl() : ownedMutex_(new Mutex()), conditionVariable_(), mutex_(NULL) { init(ownedMutex_.get()); }
- Impl(Monitor* monitor) : mutex_(NULL), condInitialized_(false) { init(&(monitor->mutex())); }
+ Impl(Mutex* mutex) : ownedMutex_(), conditionVariable_(), mutex_(NULL) { init(mutex); }
- ~Impl() { cleanup(); }
+ Impl(Monitor* monitor) : ownedMutex_(), conditionVariable_(), mutex_(NULL) {
+ init(&(monitor->mutex()));
+ }
Mutex& mutex() { return *mutex_; }
- void lock() { mutex().lock(); }
- void unlock() { mutex().unlock(); }
+ void lock() { mutex_->lock(); }
+ void unlock() { mutex_->unlock(); }
/**
* Exception-throwing version of waitForTimeRelative(), called simply
@@ -68,15 +61,12 @@ public:
* If the condition occurs, this function returns cleanly; on timeout or
* error an exception is thrown.
*/
- void wait(int64_t timeout_ms) const {
+ void wait(int64_t timeout_ms) {
int result = waitForTimeRelative(timeout_ms);
if (result == THRIFT_ETIMEDOUT) {
- // pthread_cond_timedwait has been observed to return early on
- // various platforms, so comment out this assert.
- // assert(Util::currentTime() >= (now + timeout));
throw TimedOutException();
} else if (result != 0) {
- throw TException("pthread_cond_wait() or pthread_cond_timedwait() failed");
+ throw TException("Monitor::wait() failed");
}
}
@@ -86,88 +76,86 @@ public:
*
* Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code.
*/
- int waitForTimeRelative(int64_t timeout_ms) const {
+ int waitForTimeRelative(int64_t timeout_ms) {
if (timeout_ms == 0LL) {
return waitForever();
}
- struct THRIFT_TIMESPEC abstime;
- Util::toTimespec(abstime, Util::currentTime() + timeout_ms);
- return waitForTime(&abstime);
+ assert(mutex_);
+ std::timed_mutex* mutexImpl = static_cast<std::timed_mutex*>(mutex_->getUnderlyingImpl());
+ assert(mutexImpl);
+
+ std::unique_lock<std::timed_mutex> lock(*mutexImpl, std::adopt_lock);
+ bool timedout = (conditionVariable_.wait_for(lock, std::chrono::milliseconds(timeout_ms))
+ == std::cv_status::timeout);
+ lock.release();
+ return (timedout ? THRIFT_ETIMEDOUT : 0);
}
/**
* Waits until the absolute time specified using struct THRIFT_TIMESPEC.
* Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code.
*/
- int waitForTime(const THRIFT_TIMESPEC* abstime) const {
+ int waitForTime(const THRIFT_TIMESPEC* abstime) {
+ struct timeval temp;
+ temp.tv_sec = static_cast<long>(abstime->tv_sec);
+ temp.tv_usec = static_cast<long>(abstime->tv_nsec) / 1000;
+ return waitForTime(&temp);
+ }
+
+ /**
+ * Waits until the absolute time specified using struct timeval.
+ * Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code.
+ */
+ int waitForTime(const struct timeval* abstime) {
assert(mutex_);
- pthread_mutex_t* mutexImpl = reinterpret_cast<pthread_mutex_t*>(mutex_->getUnderlyingImpl());
+ std::timed_mutex* mutexImpl = static_cast<std::timed_mutex*>(mutex_->getUnderlyingImpl());
assert(mutexImpl);
- // XXX Need to assert that caller owns mutex
- return pthread_cond_timedwait(&pthread_cond_, mutexImpl, abstime);
+ struct timeval currenttime;
+ Util::toTimeval(currenttime, Util::currentTime());
+
+ long tv_sec = static_cast<long>(abstime->tv_sec - currenttime.tv_sec);
+ long tv_usec = static_cast<long>(abstime->tv_usec - currenttime.tv_usec);
+ if (tv_sec < 0)
+ tv_sec = 0;
+ if (tv_usec < 0)
+ tv_usec = 0;
+
+ std::unique_lock<std::timed_mutex> lock(*mutexImpl, std::adopt_lock);
+ bool timedout = (conditionVariable_.wait_for(lock,
+ std::chrono::seconds(tv_sec)
+ + std::chrono::microseconds(tv_usec))
+ == std::cv_status::timeout);
+ lock.release();
+ return (timedout ? THRIFT_ETIMEDOUT : 0);
}
- int waitForTime(const struct timeval* abstime) const {
- struct THRIFT_TIMESPEC temp;
- temp.tv_sec = abstime->tv_sec;
- temp.tv_nsec = abstime->tv_usec * 1000;
- return waitForTime(&temp);
- }
/**
* Waits forever until the condition occurs.
* Returns 0 if condition occurs, or an error code otherwise.
*/
- int waitForever() const {
+ int waitForever() {
assert(mutex_);
- pthread_mutex_t* mutexImpl = reinterpret_cast<pthread_mutex_t*>(mutex_->getUnderlyingImpl());
+ std::timed_mutex* mutexImpl = static_cast<std::timed_mutex*>(mutex_->getUnderlyingImpl());
assert(mutexImpl);
- return pthread_cond_wait(&pthread_cond_, mutexImpl);
- }
- void notify() {
- // XXX Need to assert that caller owns mutex
- int iret = pthread_cond_signal(&pthread_cond_);
- THRIFT_UNUSED_VARIABLE(iret);
- assert(iret == 0);
+ std::unique_lock<std::timed_mutex> lock(*mutexImpl, std::adopt_lock);
+ conditionVariable_.wait(lock);
+ lock.release();
+ return 0;
}
- void notifyAll() {
- // XXX Need to assert that caller owns mutex
- int iret = pthread_cond_broadcast(&pthread_cond_);
- THRIFT_UNUSED_VARIABLE(iret);
- assert(iret == 0);
- }
-
-private:
- void init(Mutex* mutex) {
- mutex_ = mutex;
+ void notify() { conditionVariable_.notify_one(); }
- if (pthread_cond_init(&pthread_cond_, NULL) == 0) {
- condInitialized_ = true;
- }
+ void notifyAll() { conditionVariable_.notify_all(); }
- if (!condInitialized_) {
- cleanup();
- throw SystemResourceException();
- }
- }
-
- void cleanup() {
- if (condInitialized_) {
- condInitialized_ = false;
- int iret = pthread_cond_destroy(&pthread_cond_);
- THRIFT_UNUSED_VARIABLE(iret);
- assert(iret == 0);
- }
- }
+private:
+ void init(Mutex* mutex) { mutex_ = mutex; }
- unique_ptr<Mutex> ownedMutex_;
+ const std::unique_ptr<Mutex> ownedMutex_;
+ std::condition_variable_any conditionVariable_;
Mutex* mutex_;
-
- mutable pthread_cond_t pthread_cond_;
- mutable bool condInitialized_;
};
Monitor::Monitor() : impl_(new Monitor::Impl()) {
@@ -182,43 +170,43 @@ Monitor::~Monitor() {
}
Mutex& Monitor::mutex() const {
- return impl_->mutex();
+ return const_cast<Monitor::Impl*>(impl_)->mutex();
}
void Monitor::lock() const {
- impl_->lock();
+ const_cast<Monitor::Impl*>(impl_)->lock();
}
void Monitor::unlock() const {
- impl_->unlock();
+ const_cast<Monitor::Impl*>(impl_)->unlock();
}
void Monitor::wait(int64_t timeout) const {
- impl_->wait(timeout);
+ const_cast<Monitor::Impl*>(impl_)->wait(timeout);
}
int Monitor::waitForTime(const THRIFT_TIMESPEC* abstime) const {
- return impl_->waitForTime(abstime);
+ return const_cast<Monitor::Impl*>(impl_)->waitForTime(abstime);
}
int Monitor::waitForTime(const timeval* abstime) const {
- return impl_->waitForTime(abstime);
+ return const_cast<Monitor::Impl*>(impl_)->waitForTime(abstime);
}
int Monitor::waitForTimeRelative(int64_t timeout_ms) const {
- return impl_->waitForTimeRelative(timeout_ms);
+ return const_cast<Monitor::Impl*>(impl_)->waitForTimeRelative(timeout_ms);
}
int Monitor::waitForever() const {
- return impl_->waitForever();
+ return const_cast<Monitor::Impl*>(impl_)->waitForever();
}
void Monitor::notify() const {
- impl_->notify();
+ const_cast<Monitor::Impl*>(impl_)->notify();
}
void Monitor::notifyAll() const {
- impl_->notifyAll();
+ const_cast<Monitor::Impl*>(impl_)->notifyAll();
}
}
}
diff --git a/lib/cpp/src/thrift/concurrency/Mutex.cpp b/lib/cpp/src/thrift/concurrency/Mutex.cpp
index a5264617d..75802835d 100644
--- a/lib/cpp/src/thrift/concurrency/Mutex.cpp
+++ b/lib/cpp/src/thrift/concurrency/Mutex.cpp
@@ -17,202 +17,29 @@
* under the License.
*/
-// needed to test for pthread implementation capabilities:
-#define __USE_GNU
-
-#include <thrift/thrift-config.h>
-
-#include <thrift/Thrift.h>
-#include <thrift/concurrency/Exception.h>
#include <thrift/concurrency/Mutex.h>
-#include <thrift/concurrency/Util.h>
-
-#include <assert.h>
-#include <stdlib.h>
-#include <pthread.h>
-#include <signal.h>
-#include <string.h>
-#include <boost/format.hpp>
+#include <chrono>
+#include <mutex>
namespace apache {
namespace thrift {
namespace concurrency {
-// Enable this to turn on mutex contention profiling support
-// #define THRIFT_PTHREAD_MUTEX_CONTENTION_PROFILING
-
-#ifdef THRIFT_PTHREAD_MUTEX_CONTENTION_PROFILING
-
-static int32_t mutexProfilingCounter = 0;
-static int32_t mutexProfilingSampleRate = 0;
-static MutexWaitCallback mutexProfilingCallback = 0;
-
-void enableMutexProfiling(int32_t profilingSampleRate, MutexWaitCallback callback) {
- mutexProfilingSampleRate = profilingSampleRate;
- mutexProfilingCallback = callback;
-}
-
-#define PROFILE_MUTEX_START_LOCK() int64_t _lock_startTime = maybeGetProfilingStartTime();
-
-#define PROFILE_MUTEX_NOT_LOCKED() \
- do { \
- if (_lock_startTime > 0) { \
- int64_t endTime = Util::currentTimeUsec(); \
- (*mutexProfilingCallback)(this, endTime - _lock_startTime); \
- } \
- } while (0)
-
-#define PROFILE_MUTEX_LOCKED() \
- do { \
- profileTime_ = _lock_startTime; \
- if (profileTime_ > 0) { \
- profileTime_ = Util::currentTimeUsec() - profileTime_; \
- } \
- } while (0)
-
-#define PROFILE_MUTEX_START_UNLOCK() \
- int64_t _temp_profileTime = profileTime_; \
- profileTime_ = 0;
-
-#define PROFILE_MUTEX_UNLOCKED() \
- do { \
- if (_temp_profileTime > 0) { \
- (*mutexProfilingCallback)(this, _temp_profileTime); \
- } \
- } while (0)
-
-static inline int64_t maybeGetProfilingStartTime() {
- if (mutexProfilingSampleRate && mutexProfilingCallback) {
- // This block is unsynchronized, but should produce a reasonable sampling
- // rate on most architectures. The main race conditions are the gap
- // between the decrement and the test, the non-atomicity of decrement, and
- // potential caching of different values at different CPUs.
- //
- // - if two decrements race, the likeliest result is that the counter
- // decrements slowly (perhaps much more slowly) than intended.
- //
- // - many threads could potentially decrement before resetting the counter
- // to its large value, causing each additional incoming thread to
- // profile every call. This situation is unlikely to persist for long
- // as the critical gap is quite short, but profiling could be bursty.
- sig_atomic_t localValue = --mutexProfilingCounter;
- if (localValue <= 0) {
- mutexProfilingCounter = mutexProfilingSampleRate;
- return Util::currentTimeUsec();
- }
- }
-
- return 0;
-}
-
-#else
-#define PROFILE_MUTEX_START_LOCK()
-#define PROFILE_MUTEX_NOT_LOCKED()
-#define PROFILE_MUTEX_LOCKED()
-#define PROFILE_MUTEX_START_UNLOCK()
-#define PROFILE_MUTEX_UNLOCKED()
-#endif // THRIFT_PTHREAD_MUTEX_CONTENTION_PROFILING
-
-#define EINTR_LOOP(_CALL) int ret; do { ret = _CALL; } while (ret == EINTR)
-#define ABORT_ONFAIL(_CALL) { EINTR_LOOP(_CALL); if (ret) { abort(); } }
-#define THROW_SRE(_CALLSTR, RET) { throw SystemResourceException(boost::str(boost::format("%1% returned %2% (%3%)") % _CALLSTR % RET % ::strerror(RET))); }
-#define THROW_SRE_ONFAIL(_CALL) { EINTR_LOOP(_CALL); if (ret) { THROW_SRE(#_CALL, ret); } }
-#define THROW_SRE_TRYFAIL(_CALL) { EINTR_LOOP(_CALL); if (ret == 0) { return true; } else if (ret == EBUSY) { return false; } THROW_SRE(#_CALL, ret); }
-
/**
- * Implementation of Mutex class using POSIX mutex
+ * Implementation of Mutex class using C++11 std::timed_mutex
*
- * Throws apache::thrift::concurrency::SystemResourceException on error.
+ * Methods throw std::system_error on error.
*
* @version $Id:$
*/
-class Mutex::impl {
-public:
- impl(Initializer init) : initialized_(false) {
-#ifdef THRIFT_PTHREAD_MUTEX_CONTENTION_PROFILING
- profileTime_ = 0;
-#endif
- init(&pthread_mutex_);
- initialized_ = true;
- }
+class Mutex::impl : public std::timed_mutex {};
- ~impl() {
- if (initialized_) {
- initialized_ = false;
- ABORT_ONFAIL(pthread_mutex_destroy(&pthread_mutex_));
- }
- }
-
- void lock() const {
- PROFILE_MUTEX_START_LOCK();
- THROW_SRE_ONFAIL(pthread_mutex_lock(&pthread_mutex_));
- PROFILE_MUTEX_LOCKED();
- }
-
- bool trylock() const {
- THROW_SRE_TRYFAIL(pthread_mutex_trylock(&pthread_mutex_));
- }
-
- bool timedlock(int64_t milliseconds) const {
-#if defined(_POSIX_TIMEOUTS) && _POSIX_TIMEOUTS >= 200112L
- PROFILE_MUTEX_START_LOCK();
-
- struct THRIFT_TIMESPEC ts;
- Util::toTimespec(ts, milliseconds + Util::currentTime());
- EINTR_LOOP(pthread_mutex_timedlock(&pthread_mutex_, &ts));
- if (ret == 0) {
- PROFILE_MUTEX_LOCKED();
- return true;
- } else if (ret == ETIMEDOUT) {
- PROFILE_MUTEX_NOT_LOCKED();
- return false;
- }
-
- THROW_SRE("pthread_mutex_timedlock(&pthread_mutex_, &ts)", ret);
-#else
- /* Otherwise follow solution used by Mono for Android */
- struct THRIFT_TIMESPEC sleepytime, now, to;
-
- /* This is just to avoid a completely busy wait */
- sleepytime.tv_sec = 0;
- sleepytime.tv_nsec = 10000000L; /* 10ms */
-
- Util::toTimespec(to, milliseconds + Util::currentTime());
-
- while ((trylock()) == false) {
- Util::toTimespec(now, Util::currentTime());
- if (now.tv_sec >= to.tv_sec && now.tv_nsec >= to.tv_nsec) {
- return false;
- }
- nanosleep(&sleepytime, NULL);
- }
-
- return true;
-#endif
- }
-
- void unlock() const {
- PROFILE_MUTEX_START_UNLOCK();
- THROW_SRE_ONFAIL(pthread_mutex_unlock(&pthread_mutex_));
- PROFILE_MUTEX_UNLOCKED();
- }
-
- void* getUnderlyingImpl() const { return (void*)&pthread_mutex_; }
-
-private:
- mutable pthread_mutex_t pthread_mutex_;
- mutable bool initialized_;
-#ifdef THRIFT_PTHREAD_MUTEX_CONTENTION_PROFILING
- mutable int64_t profileTime_;
-#endif
-};
-
-Mutex::Mutex(Initializer init) : impl_(new Mutex::impl(init)) {
+Mutex::Mutex() : impl_(new Mutex::impl()) {
}
void* Mutex::getUnderlyingImpl() const {
- return impl_->getUnderlyingImpl();
+ return impl_.get();
}
void Mutex::lock() const {
@@ -220,161 +47,17 @@ void Mutex::lock() const {
}
bool Mutex::trylock() const {
- return impl_->trylock();
+ return impl_->try_lock();
}
bool Mutex::timedlock(int64_t ms) const {
- return impl_->timedlock(ms);
+ return impl_->try_lock_for(std::chrono::milliseconds(ms));
}
void Mutex::unlock() const {
impl_->unlock();
}
-void Mutex::DEFAULT_INITIALIZER(void* arg) {
- pthread_mutex_t* pthread_mutex = (pthread_mutex_t*)arg;
- THROW_SRE_ONFAIL(pthread_mutex_init(pthread_mutex, NULL));
-}
-
-#if defined(PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP) || defined(PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP) || defined(PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP)
-static void init_with_kind(pthread_mutex_t* mutex, int kind) {
- pthread_mutexattr_t mutexattr;
- THROW_SRE_ONFAIL(pthread_mutexattr_init(&mutexattr));
- THROW_SRE_ONFAIL(pthread_mutexattr_settype(&mutexattr, kind));
- THROW_SRE_ONFAIL(pthread_mutex_init(mutex, &mutexattr));
- THROW_SRE_ONFAIL(pthread_mutexattr_destroy(&mutexattr));
-}
-#endif
-
-#ifdef PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP
-void Mutex::ADAPTIVE_INITIALIZER(void* arg) {
- // From mysql source: mysys/my_thr_init.c
- // Set mutex type to "fast" a.k.a "adaptive"
- //
- // In this case the thread may steal the mutex from some other thread
- // that is waiting for the same mutex. This will save us some
- // context switches but may cause a thread to 'starve forever' while
- // waiting for the mutex (not likely if the code within the mutex is
- // short).
- init_with_kind((pthread_mutex_t*)arg, PTHREAD_MUTEX_ADAPTIVE_NP);
-}
-#endif
-
-#ifdef PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP
-void Mutex::ERRORCHECK_INITIALIZER(void* arg) {
- init_with_kind((pthread_mutex_t*)arg, PTHREAD_MUTEX_ERRORCHECK);
-}
-#endif
-
-#ifdef PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP
-void Mutex::RECURSIVE_INITIALIZER(void* arg) {
- init_with_kind((pthread_mutex_t*)arg, PTHREAD_MUTEX_RECURSIVE_NP);
-}
-#endif
-
-/**
- * Implementation of ReadWriteMutex class using POSIX rw lock
- *
- * @version $Id:$
- */
-class ReadWriteMutex::impl {
-public:
- impl() : initialized_(false) {
-#ifdef THRIFT_PTHREAD_MUTEX_CONTENTION_PROFILING
- profileTime_ = 0;
-#endif
- THROW_SRE_ONFAIL(pthread_rwlock_init(&rw_lock_, NULL));
- initialized_ = true;
- }
-
- ~impl() {
- if (initialized_) {
- initialized_ = false;
- ABORT_ONFAIL(pthread_rwlock_destroy(&rw_lock_));
- }
- }
-
- void acquireRead() const {
- PROFILE_MUTEX_START_LOCK();
- THROW_SRE_ONFAIL(pthread_rwlock_rdlock(&rw_lock_));
- PROFILE_MUTEX_NOT_LOCKED(); // not exclusive, so use not-locked path
- }
-
- void acquireWrite() const {
- PROFILE_MUTEX_START_LOCK();
- THROW_SRE_ONFAIL(pthread_rwlock_wrlock(&rw_lock_));
- PROFILE_MUTEX_LOCKED();
- }
-
- bool attemptRead() const { THROW_SRE_TRYFAIL(pthread_rwlock_tryrdlock(&rw_lock_)); }
-
- bool attemptWrite() const { THROW_SRE_TRYFAIL(pthread_rwlock_trywrlock(&rw_lock_)); }
-
- void release() const {
- PROFILE_MUTEX_START_UNLOCK();
- THROW_SRE_ONFAIL(pthread_rwlock_unlock(&rw_lock_));
- PROFILE_MUTEX_UNLOCKED();
- }
-
-private:
- mutable pthread_rwlock_t rw_lock_;
- mutable bool initialized_;
-#ifdef THRIFT_PTHREAD_MUTEX_CONTENTION_PROFILING
- mutable int64_t profileTime_;
-#endif
-};
-
-ReadWriteMutex::ReadWriteMutex() : impl_(new ReadWriteMutex::impl()) {
-}
-
-void ReadWriteMutex::acquireRead() const {
- impl_->acquireRead();
-}
-
-void ReadWriteMutex::acquireWrite() const {
- impl_->acquireWrite();
-}
-
-bool ReadWriteMutex::attemptRead() const {
- return impl_->attemptRead();
-}
-
-bool ReadWriteMutex::attemptWrite() const {
- return impl_->attemptWrite();
-}
-
-void ReadWriteMutex::release() const {
- impl_->release();
-}
-
-NoStarveReadWriteMutex::NoStarveReadWriteMutex() : writerWaiting_(false) {
-}
-
-void NoStarveReadWriteMutex::acquireRead() const {
- if (writerWaiting_) {
- // writer is waiting, block on the writer's mutex until he's done with it
- mutex_.lock();
- mutex_.unlock();
- }
-
- ReadWriteMutex::acquireRead();
-}
-
-void NoStarveReadWriteMutex::acquireWrite() const {
- // if we can acquire the rwlock the easy way, we're done
- if (attemptWrite()) {
- return;
- }
-
- // failed to get the rwlock, do it the hard way:
- // locking the mutex and setting writerWaiting will cause all new readers to
- // block on the mutex rather than on the rwlock.
- mutex_.lock();
- writerWaiting_ = true;
- ReadWriteMutex::acquireWrite();
- writerWaiting_ = false;
- mutex_.unlock();
-}
}
}
} // apache::thrift::concurrency
diff --git a/lib/cpp/src/thrift/concurrency/Mutex.h b/lib/cpp/src/thrift/concurrency/Mutex.h
index a1f539685..123ae785d 100644
--- a/lib/cpp/src/thrift/concurrency/Mutex.h
+++ b/lib/cpp/src/thrift/concurrency/Mutex.h
@@ -28,31 +28,6 @@ namespace apache {
namespace thrift {
namespace concurrency {
-#ifndef THRIFT_NO_CONTENTION_PROFILING
-
-/**
- * Determines if the Thrift Mutex and ReadWriteMutex classes will attempt to
- * profile their blocking acquire methods. If this value is set to non-zero,
- * Thrift will attempt to invoke the callback once every profilingSampleRate
- * times. However, as the sampling is not synchronized the rate is not
- * guranateed, and could be subject to big bursts and swings. Please ensure
- * your sampling callback is as performant as your application requires.
- *
- * The callback will get called with the wait time taken to lock the mutex in
- * usec and a (void*) that uniquely identifies the Mutex (or ReadWriteMutex)
- * being locked.
- *
- * The enableMutexProfiling() function is unsynchronized; calling this function
- * while profiling is already enabled may result in race conditions. On
- * architectures where a pointer assignment is atomic, this is safe but there
- * is no guarantee threads will agree on a single callback within any
- * particular time period.
- */
-typedef void (*MutexWaitCallback)(const void* id, int64_t waitTimeMicros);
-void enableMutexProfiling(int32_t profilingSampleRate, MutexWaitCallback callback);
-
-#endif
-
/**
* NOTE: All mutex implementations throw an exception on failure. See each
* specific implementation to understand the exception type(s) used.
@@ -65,9 +40,7 @@ void enableMutexProfiling(int32_t profilingSampleRate, MutexWaitCallback callbac
*/
class Mutex {
public:
- typedef void (*Initializer)(void*);
-
- Mutex(Initializer init = DEFAULT_INITIALIZER);
+ Mutex();
virtual ~Mutex() {}
virtual void lock() const;
@@ -77,57 +50,11 @@ public:
void* getUnderlyingImpl() const;
- // If you attempt to use one of these and it fails to link, it means
- // your version of pthreads does not support it - try another one.
- static void ADAPTIVE_INITIALIZER(void*);
- static void DEFAULT_INITIALIZER(void*);
- static void ERRORCHECK_INITIALIZER(void*);
- static void RECURSIVE_INITIALIZER(void*);
-
-private:
- class impl;
- std::shared_ptr<impl> impl_;
-};
-
-class ReadWriteMutex {
-public:
- ReadWriteMutex();
- virtual ~ReadWriteMutex() {}
-
- // these get the lock and block until it is done successfully
- virtual void acquireRead() const;
- virtual void acquireWrite() const;
-
- // these attempt to get the lock, returning false immediately if they fail
- virtual bool attemptRead() const;
- virtual bool attemptWrite() const;
-
- // this releases both read and write locks
- virtual void release() const;
-
private:
class impl;
std::shared_ptr<impl> impl_;
};
-/**
- * A ReadWriteMutex that guarantees writers will not be starved by readers:
- * When a writer attempts to acquire the mutex, all new readers will be
- * blocked from acquiring the mutex until the writer has acquired and
- * released it. In some operating systems, this may already be guaranteed
- * by a regular ReadWriteMutex.
- */
-class NoStarveReadWriteMutex : public ReadWriteMutex {
-public:
- NoStarveReadWriteMutex();
-
- virtual void acquireRead() const;
- virtual void acquireWrite() const;
-
-private:
- Mutex mutex_;
- mutable volatile bool writerWaiting_;
-};
class Guard : boost::noncopyable {
public:
@@ -156,32 +83,6 @@ private:
const Mutex* mutex_;
};
-// Can be used as second argument to RWGuard to make code more readable
-// as to whether we're doing acquireRead() or acquireWrite().
-enum RWGuardType { RW_READ = 0, RW_WRITE = 1 };
-
-class RWGuard : boost::noncopyable {
-public:
- RWGuard(const ReadWriteMutex& value, bool write = false) : rw_mutex_(value) {
- if (write) {
- rw_mutex_.acquireWrite();
- } else {
- rw_mutex_.acquireRead();
- }
- }
-
- RWGuard(const ReadWriteMutex& value, RWGuardType type) : rw_mutex_(value) {
- if (type == RW_WRITE) {
- rw_mutex_.acquireWrite();
- } else {
- rw_mutex_.acquireRead();
- }
- }
- ~RWGuard() { rw_mutex_.release(); }
-
-private:
- const ReadWriteMutex& rw_mutex_;
-};
}
}
} // apache::thrift::concurrency
diff --git a/lib/cpp/src/thrift/concurrency/PosixThreadFactory.cpp b/lib/cpp/src/thrift/concurrency/PosixThreadFactory.cpp
deleted file mode 100644
index 5c5926977..000000000
--- a/lib/cpp/src/thrift/concurrency/PosixThreadFactory.cpp
+++ /dev/null
@@ -1,335 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <thrift/thrift-config.h>
-
-#include <thrift/concurrency/Exception.h>
-#include <thrift/concurrency/Monitor.h>
-#include <thrift/concurrency/PosixThreadFactory.h>
-
-#if GOOGLE_PERFTOOLS_REGISTER_THREAD
-#include <google/profiler.h>
-#endif
-
-#include <assert.h>
-#include <pthread.h>
-
-#include <iostream>
-
-#include <memory>
-
-namespace apache {
-namespace thrift {
-namespace concurrency {
-
-/**
- * The POSIX thread class.
- *
- * @version $Id:$
- */
-class PthreadThread : public Thread {
-public:
- enum STATE { uninitialized, starting, started, stopping, stopped };
-
- static const int MB = 1024 * 1024;
-
- static void* threadMain(void* arg);
-
-private:
- pthread_t pthread_;
- Monitor monitor_; // guard to protect state_ and also notification
- STATE state_; // to protect proper thread start behavior
- int policy_;
- int priority_;
- int stackSize_;
- std::weak_ptr<PthreadThread> self_;
- bool detached_;
-
-public:
- PthreadThread(int policy,
- int priority,
- int stackSize,
- bool detached,
- std::shared_ptr<Runnable> runnable)
- :
-
-#ifndef _WIN32
- pthread_(0),
-#endif // _WIN32
- state_(uninitialized),
- policy_(policy),
- priority_(priority),
- stackSize_(stackSize),
- detached_(detached) {
-
- this->Thread::runnable(runnable);
- }
-
- ~PthreadThread() {
- /* Nothing references this thread, if is is not detached, do a join
- now, otherwise the thread-id and, possibly, other resources will
- be leaked. */
- if (!detached_) {
- try {
- join();
- } catch (...) {
- // We're really hosed.
- }
- }
- }
-
- STATE getState() const
- {
- Synchronized sync(monitor_);
- return state_;
- }
-
- void setState(STATE newState)
- {
- Synchronized sync(monitor_);
- state_ = newState;
-
- // unblock start() with the knowledge that the thread has actually
- // started running, which avoids a race in detached threads.
- if (newState == started) {
- monitor_.notify();
- }
- }
-
- void start() {
- if (getState() != uninitialized) {
- return;
- }
-
- pthread_attr_t thread_attr;
- if (pthread_attr_init(&thread_attr) != 0) {
- throw SystemResourceException("pthread_attr_init failed");
- }
-
- if (pthread_attr_setdetachstate(&thread_attr,
- detached_ ? PTHREAD_CREATE_DETACHED : PTHREAD_CREATE_JOINABLE)
- != 0) {
- throw SystemResourceException("pthread_attr_setdetachstate failed");
- }
-
- // Set thread stack size
- if (pthread_attr_setstacksize(&thread_attr, MB * stackSize_) != 0) {
- throw SystemResourceException("pthread_attr_setstacksize failed");
- }
-
-// Set thread policy
-#ifdef _WIN32
- // WIN32 Pthread implementation doesn't seem to support sheduling policies other then
- // PosixThreadFactory::OTHER - runtime error
- policy_ = PosixThreadFactory::OTHER;
-#endif
-
-#if _POSIX_THREAD_PRIORITY_SCHEDULING > 0
- if (pthread_attr_setschedpolicy(&thread_attr, policy_) != 0) {
- throw SystemResourceException("pthread_attr_setschedpolicy failed");
- }
-#endif
-
- struct sched_param sched_param;
- sched_param.sched_priority = priority_;
-
- // Set thread priority
- if (pthread_attr_setschedparam(&thread_attr, &sched_param) != 0) {
- throw SystemResourceException("pthread_attr_setschedparam failed");
- }
-
- // Create reference
- std::shared_ptr<PthreadThread>* selfRef = new std::shared_ptr<PthreadThread>();
- *selfRef = self_.lock();
-
- setState(starting);
-
- Synchronized sync(monitor_);
-
- if (pthread_create(&pthread_, &thread_attr, threadMain, (void*)selfRef) != 0) {
- throw SystemResourceException("pthread_create failed");
- }
-
- // The caller may not choose to guarantee the scope of the Runnable
- // being used in the thread, so we must actually wait until the thread
- // starts before we return. If we do not wait, it would be possible
- // for the caller to start destructing the Runnable and the Thread,
- // and we would end up in a race. This was identified with valgrind.
- monitor_.wait();
- }
-
- void join() {
- if (!detached_ && getState() != uninitialized) {
- void* ignore;
- /* XXX
- If join fails it is most likely due to the fact
- that the last reference was the thread itself and cannot
- join. This results in leaked threads and will eventually
- cause the process to run out of thread resources.
- We're beyond the point of throwing an exception. Not clear how
- best to handle this. */
- int res = pthread_join(pthread_, &ignore);
- detached_ = (res == 0);
- if (res != 0) {
- GlobalOutput.printf("PthreadThread::join(): fail with code %d", res);
- }
- }
- }
-
- Thread::id_t getId() {
-
-#ifndef _WIN32
- return (Thread::id_t)pthread_;
-#else
- return (Thread::id_t)pthread_.p;
-#endif // _WIN32
- }
-
- std::shared_ptr<Runnable> runnable() const { return Thread::runnable(); }
-
- void runnable(std::shared_ptr<Runnable> value) { Thread::runnable(value); }
-
- void weakRef(std::shared_ptr<PthreadThread> self) {
- assert(self.get() == this);
- self_ = std::weak_ptr<PthreadThread>(self);
- }
-};
-
-void* PthreadThread::threadMain(void* arg) {
- std::shared_ptr<PthreadThread> thread = *(std::shared_ptr<PthreadThread>*)arg;
- delete reinterpret_cast<std::shared_ptr<PthreadThread>*>(arg);
-
-#if GOOGLE_PERFTOOLS_REGISTER_THREAD
- ProfilerRegisterThread();
-#endif
-
- thread->setState(started);
-
- thread->runnable()->run();
-
- STATE _s = thread->getState();
- if (_s != stopping && _s != stopped) {
- thread->setState(stopping);
- }
-
- return (void*)0;
-}
-
-/**
- * Converts generic posix thread schedule policy enums into pthread
- * API values.
- */
-static int toPthreadPolicy(PosixThreadFactory::POLICY policy) {
- switch (policy) {
- case PosixThreadFactory::OTHER:
- return SCHED_OTHER;
- case PosixThreadFactory::FIFO:
- return SCHED_FIFO;
- case PosixThreadFactory::ROUND_ROBIN:
- return SCHED_RR;
- }
- return SCHED_OTHER;
-}
-
-/**
- * Converts relative thread priorities to absolute value based on posix
- * thread scheduler policy
- *
- * The idea is simply to divide up the priority range for the given policy
- * into the correpsonding relative priority level (lowest..highest) and
- * then pro-rate accordingly.
- */
-static int toPthreadPriority(PosixThreadFactory::POLICY policy, PosixThreadFactory::PRIORITY priority) {
- int pthread_policy = toPthreadPolicy(policy);
- int min_priority = 0;
- int max_priority = 0;
-#ifdef HAVE_SCHED_GET_PRIORITY_MIN
- min_priority = sched_get_priority_min(pthread_policy);
-#endif
-#ifdef HAVE_SCHED_GET_PRIORITY_MAX
- max_priority = sched_get_priority_max(pthread_policy);
-#endif
- int quanta = (PosixThreadFactory::HIGHEST - PosixThreadFactory::LOWEST) + 1;
- float stepsperquanta = (float)(max_priority - min_priority) / quanta;
-
- if (priority <= PosixThreadFactory::HIGHEST) {
- return (int)(min_priority + stepsperquanta * priority);
- } else {
- // should never get here for priority increments.
- assert(false);
- return (int)(min_priority + stepsperquanta * PosixThreadFactory::NORMAL);
- }
-}
-
-PosixThreadFactory::PosixThreadFactory(POLICY policy,
- PRIORITY priority,
- int stackSize,
- bool detached)
- : ThreadFactory(detached),
- policy_(policy),
- priority_(priority),
- stackSize_(stackSize) {
-}
-
-PosixThreadFactory::PosixThreadFactory(bool detached)
- : ThreadFactory(detached),
- policy_(ROUND_ROBIN),
- priority_(NORMAL),
- stackSize_(1) {
-}
-
-std::shared_ptr<Thread> PosixThreadFactory::newThread(std::shared_ptr<Runnable> runnable) const {
- std::shared_ptr<PthreadThread> result
- = std::shared_ptr<PthreadThread>(new PthreadThread(toPthreadPolicy(policy_),
- toPthreadPriority(policy_, priority_),
- stackSize_,
- isDetached(),
- runnable));
- result->weakRef(result);
- runnable->thread(result);
- return result;
-}
-
-int PosixThreadFactory::getStackSize() const {
- return stackSize_;
-}
-
-void PosixThreadFactory::setStackSize(int value) {
- stackSize_ = value;
-}
-
-PosixThreadFactory::PRIORITY PosixThreadFactory::getPriority() const {
- return priority_;
-}
-
-void PosixThreadFactory::setPriority(PRIORITY value) {
- priority_ = value;
-}
-
-Thread::id_t PosixThreadFactory::getCurrentThreadId() const {
-#ifndef _WIN32
- return (Thread::id_t)pthread_self();
-#else
- return (Thread::id_t)pthread_self().p;
-#endif // _WIN32
-}
-
-}
-}
-} // apache::thrift::concurrency
diff --git a/lib/cpp/src/thrift/concurrency/PosixThreadFactory.h b/lib/cpp/src/thrift/concurrency/PosixThreadFactory.h
deleted file mode 100644
index cb3b17c9c..000000000
--- a/lib/cpp/src/thrift/concurrency/PosixThreadFactory.h
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#ifndef _THRIFT_CONCURRENCY_POSIXTHREADFACTORY_H_
-#define _THRIFT_CONCURRENCY_POSIXTHREADFACTORY_H_ 1
-
-#include <thrift/concurrency/Thread.h>
-
-#include <memory>
-
-namespace apache {
-namespace thrift {
-namespace concurrency {
-
-/**
- * A thread factory to create posix threads
- *
- * @version $Id:$
- */
-class PosixThreadFactory : public ThreadFactory {
-
-public:
- /**
- * POSIX Thread scheduler policies
- */
- enum POLICY { OTHER, FIFO, ROUND_ROBIN };
-
- /**
- * POSIX Thread scheduler relative priorities,
- *
- * Absolute priority is determined by scheduler policy and OS. This
- * enumeration specifies relative priorities such that one can specify a
- * priority within a giving scheduler policy without knowing the absolute
- * value of the priority.
- */
- enum PRIORITY {
- LOWEST = 0,
- LOWER = 1,
- LOW = 2,
- NORMAL = 3,
- HIGH = 4,
- HIGHER = 5,
- HIGHEST = 6,
- INCREMENT = 7,
- DECREMENT = 8
- };
-
- /**
- * Posix thread (pthread) factory. All threads created by a factory are reference-counted
- * via std::shared_ptr. The factory guarantees that threads and the Runnable tasks
- * they host will be properly cleaned up once the last strong reference to both is
- * given up.
- *
- * Threads are created with the specified policy, priority, stack-size and detachable-mode
- * detached means the thread is free-running and will release all system resources the
- * when it completes. A detachable thread is not joinable. The join method
- * of a detachable thread will return immediately with no error.
- *
- * By default threads are not joinable.
- */
- PosixThreadFactory(POLICY policy = ROUND_ROBIN,
- PRIORITY priority = NORMAL,
- int stackSize = 1,
- bool detached = true);
-
- /**
- * Provide a constructor compatible with the other factories
- * The default policy is POLICY::ROUND_ROBIN.
- * The default priority is PRIORITY::NORMAL.
- * The default stackSize is 1.
- */
- PosixThreadFactory(bool detached);
-
- // From ThreadFactory;
- std::shared_ptr<Thread> newThread(std::shared_ptr<Runnable> runnable) const;
-
- // From ThreadFactory;
- Thread::id_t getCurrentThreadId() const;
-
- /**
- * Gets stack size for newly created threads
- *
- * @return int size in megabytes
- */
- virtual int getStackSize() const;
-
- /**
- * Sets stack size for newly created threads
- *
- * @param value size in megabytes
- */
- virtual void setStackSize(int value);
-
- /**
- * Gets priority relative to current policy
- */
- virtual PRIORITY getPriority() const;
-
- /**
- * Sets priority relative to current policy
- */
- virtual void setPriority(PRIORITY priority);
-
-private:
- POLICY policy_;
- PRIORITY priority_;
- int stackSize_;
-};
-}
-}
-} // apache::thrift::concurrency
-
-#endif // #ifndef _THRIFT_CONCURRENCY_POSIXTHREADFACTORY_H_
diff --git a/lib/cpp/src/thrift/concurrency/StdMonitor.cpp b/lib/cpp/src/thrift/concurrency/StdMonitor.cpp
deleted file mode 100644
index 7b3b209a7..000000000
--- a/lib/cpp/src/thrift/concurrency/StdMonitor.cpp
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <thrift/thrift-config.h>
-
-#include <thrift/concurrency/Monitor.h>
-#include <thrift/concurrency/Exception.h>
-#include <thrift/concurrency/Util.h>
-#include <thrift/transport/PlatformSocket.h>
-#include <assert.h>
-
-#include <condition_variable>
-#include <chrono>
-#include <thread>
-#include <mutex>
-
-namespace apache {
-namespace thrift {
-namespace concurrency {
-
-/**
- * Monitor implementation using the std thread library
- *
- * @version $Id:$
- */
-class Monitor::Impl {
-
-public:
- Impl() : ownedMutex_(new Mutex()), conditionVariable_(), mutex_(NULL) { init(ownedMutex_.get()); }
-
- Impl(Mutex* mutex) : ownedMutex_(), conditionVariable_(), mutex_(NULL) { init(mutex); }
-
- Impl(Monitor* monitor) : ownedMutex_(), conditionVariable_(), mutex_(NULL) {
- init(&(monitor->mutex()));
- }
-
- Mutex& mutex() { return *mutex_; }
- void lock() { mutex_->lock(); }
- void unlock() { mutex_->unlock(); }
-
- /**
- * Exception-throwing version of waitForTimeRelative(), called simply
- * wait(int64) for historical reasons. Timeout is in milliseconds.
- *
- * If the condition occurs, this function returns cleanly; on timeout or
- * error an exception is thrown.
- */
- void wait(int64_t timeout_ms) {
- int result = waitForTimeRelative(timeout_ms);
- if (result == THRIFT_ETIMEDOUT) {
- throw TimedOutException();
- } else if (result != 0) {
- throw TException("Monitor::wait() failed");
- }
- }
-
- /**
- * Waits until the specified timeout in milliseconds for the condition to
- * occur, or waits forever if timeout_ms == 0.
- *
- * Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code.
- */
- int waitForTimeRelative(int64_t timeout_ms) {
- if (timeout_ms == 0LL) {
- return waitForever();
- }
-
- assert(mutex_);
- std::timed_mutex* mutexImpl = static_cast<std::timed_mutex*>(mutex_->getUnderlyingImpl());
- assert(mutexImpl);
-
- std::unique_lock<std::timed_mutex> lock(*mutexImpl, std::adopt_lock);
- bool timedout = (conditionVariable_.wait_for(lock, std::chrono::milliseconds(timeout_ms))
- == std::cv_status::timeout);
- lock.release();
- return (timedout ? THRIFT_ETIMEDOUT : 0);
- }
-
- /**
- * Waits until the absolute time specified using struct THRIFT_TIMESPEC.
- * Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code.
- */
- int waitForTime(const THRIFT_TIMESPEC* abstime) {
- struct timeval temp;
- temp.tv_sec = static_cast<long>(abstime->tv_sec);
- temp.tv_usec = static_cast<long>(abstime->tv_nsec) / 1000;
- return waitForTime(&temp);
- }
-
- /**
- * Waits until the absolute time specified using struct timeval.
- * Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code.
- */
- int waitForTime(const struct timeval* abstime) {
- assert(mutex_);
- std::timed_mutex* mutexImpl = static_cast<std::timed_mutex*>(mutex_->getUnderlyingImpl());
- assert(mutexImpl);
-
- struct timeval currenttime;
- Util::toTimeval(currenttime, Util::currentTime());
-
- long tv_sec = static_cast<long>(abstime->tv_sec - currenttime.tv_sec);
- long tv_usec = static_cast<long>(abstime->tv_usec - currenttime.tv_usec);
- if (tv_sec < 0)
- tv_sec = 0;
- if (tv_usec < 0)
- tv_usec = 0;
-
- std::unique_lock<std::timed_mutex> lock(*mutexImpl, std::adopt_lock);
- bool timedout = (conditionVariable_.wait_for(lock,
- std::chrono::seconds(tv_sec)
- + std::chrono::microseconds(tv_usec))
- == std::cv_status::timeout);
- lock.release();
- return (timedout ? THRIFT_ETIMEDOUT : 0);
- }
-
- /**
- * Waits forever until the condition occurs.
- * Returns 0 if condition occurs, or an error code otherwise.
- */
- int waitForever() {
- assert(mutex_);
- std::timed_mutex* mutexImpl = static_cast<std::timed_mutex*>(mutex_->getUnderlyingImpl());
- assert(mutexImpl);
-
- std::unique_lock<std::timed_mutex> lock(*mutexImpl, std::adopt_lock);
- conditionVariable_.wait(lock);
- lock.release();
- return 0;
- }
-
- void notify() { conditionVariable_.notify_one(); }
-
- void notifyAll() { conditionVariable_.notify_all(); }
-
-private:
- void init(Mutex* mutex) { mutex_ = mutex; }
-
- const std::unique_ptr<Mutex> ownedMutex_;
- std::condition_variable_any conditionVariable_;
- Mutex* mutex_;
-};
-
-Monitor::Monitor() : impl_(new Monitor::Impl()) {
-}
-Monitor::Monitor(Mutex* mutex) : impl_(new Monitor::Impl(mutex)) {
-}
-Monitor::Monitor(Monitor* monitor) : impl_(new Monitor::Impl(monitor)) {
-}
-
-Monitor::~Monitor() {
- delete impl_;
-}
-
-Mutex& Monitor::mutex() const {
- return const_cast<Monitor::Impl*>(impl_)->mutex();
-}
-
-void Monitor::lock() const {
- const_cast<Monitor::Impl*>(impl_)->lock();
-}
-
-void Monitor::unlock() const {
- const_cast<Monitor::Impl*>(impl_)->unlock();
-}
-
-void Monitor::wait(int64_t timeout) const {
- const_cast<Monitor::Impl*>(impl_)->wait(timeout);
-}
-
-int Monitor::waitForTime(const THRIFT_TIMESPEC* abstime) const {
- return const_cast<Monitor::Impl*>(impl_)->waitForTime(abstime);
-}
-
-int Monitor::waitForTime(const timeval* abstime) const {
- return const_cast<Monitor::Impl*>(impl_)->waitForTime(abstime);
-}
-
-int Monitor::waitForTimeRelative(int64_t timeout_ms) const {
- return const_cast<Monitor::Impl*>(impl_)->waitForTimeRelative(timeout_ms);
-}
-
-int Monitor::waitForever() const {
- return const_cast<Monitor::Impl*>(impl_)->waitForever();
-}
-
-void Monitor::notify() const {
- const_cast<Monitor::Impl*>(impl_)->notify();
-}
-
-void Monitor::notifyAll() const {
- const_cast<Monitor::Impl*>(impl_)->notifyAll();
-}
-}
-}
-} // apache::thrift::concurrency
diff --git a/lib/cpp/src/thrift/concurrency/StdThreadFactory.cpp b/lib/cpp/src/thrift/concurrency/StdThreadFactory.cpp
deleted file mode 100644
index c885f3aca..000000000
--- a/lib/cpp/src/thrift/concurrency/StdThreadFactory.cpp
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <thrift/thrift-config.h>
-
-#if USE_STD_THREAD
-
-#include <thrift/concurrency/Exception.h>
-#include <thrift/concurrency/Monitor.h>
-#include <thrift/concurrency/StdThreadFactory.h>
-#include <memory>
-
-#include <cassert>
-#include <thread>
-
-namespace apache {
-namespace thrift {
-namespace concurrency {
-
-/**
- * The C++11 thread class.
- *
- * Note that we use boost shared_ptr rather than std shared_ptrs here
- * because the Thread/Runnable classes use those and we don't want to
- * mix them.
- *
- * @version $Id:$
- */
-class StdThread : public Thread, public std::enable_shared_from_this<StdThread> {
-public:
- enum STATE { uninitialized, starting, started, stopping, stopped };
-
- static void threadMain(std::shared_ptr<StdThread> thread);
-
-private:
- std::unique_ptr<std::thread> thread_;
- Monitor monitor_;
- STATE state_;
- bool detached_;
-
-public:
- StdThread(bool detached, std::shared_ptr<Runnable> runnable)
- : state_(uninitialized), detached_(detached) {
- this->Thread::runnable(runnable);
- }
-
- ~StdThread() {
- if (!detached_ && thread_->joinable()) {
- try {
- join();
- } catch (...) {
- // We're really hosed.
- }
- }
- }
-
- STATE getState() const
- {
- Synchronized sync(monitor_);
- return state_;
- }
-
- void setState(STATE newState)
- {
- Synchronized sync(monitor_);
- state_ = newState;
-
- // unblock start() with the knowledge that the thread has actually
- // started running, which avoids a race in detached threads.
- if (newState == started) {
- monitor_.notify();
- }
- }
-
- void start() {
- if (getState() != uninitialized) {
- return;
- }
-
- std::shared_ptr<StdThread> selfRef = shared_from_this();
- setState(starting);
-
- Synchronized sync(monitor_);
- thread_ = std::unique_ptr<std::thread>(new std::thread(threadMain, selfRef));
-
- if (detached_)
- thread_->detach();
-
- // Wait for the thread to start and get far enough to grab everything
- // that it needs from the calling context, thus absolving the caller
- // from being required to hold on to runnable indefinitely.
- monitor_.wait();
- }
-
- void join() {
- if (!detached_ && state_ != uninitialized) {
- thread_->join();
- }
- }
-
- Thread::id_t getId() { return thread_.get() ? thread_->get_id() : std::thread::id(); }
-
- std::shared_ptr<Runnable> runnable() const { return Thread::runnable(); }
-
- void runnable(std::shared_ptr<Runnable> value) { Thread::runnable(value); }
-};
-
-void StdThread::threadMain(std::shared_ptr<StdThread> thread) {
-#if GOOGLE_PERFTOOLS_REGISTER_THREAD
- ProfilerRegisterThread();
-#endif
-
- thread->setState(started);
- thread->runnable()->run();
-
- if (thread->getState() != stopping && thread->getState() != stopped) {
- thread->setState(stopping);
- }
-}
-
-StdThreadFactory::StdThreadFactory(bool detached) : ThreadFactory(detached) {
-}
-
-std::shared_ptr<Thread> StdThreadFactory::newThread(std::shared_ptr<Runnable> runnable) const {
- std::shared_ptr<StdThread> result = std::shared_ptr<StdThread>(new StdThread(isDetached(), runnable));
- runnable->thread(result);
- return result;
-}
-
-Thread::id_t StdThreadFactory::getCurrentThreadId() const {
- return std::this_thread::get_id();
-}
-}
-}
-} // apache::thrift::concurrency
-
-#endif // USE_STD_THREAD
diff --git a/lib/cpp/src/thrift/concurrency/StdMutex.cpp b/lib/cpp/src/thrift/concurrency/Thread.cpp
index e0f79fa37..a2bb1270f 100644
--- a/lib/cpp/src/thrift/concurrency/StdMutex.cpp
+++ b/lib/cpp/src/thrift/concurrency/Thread.cpp
@@ -17,55 +17,21 @@
* under the License.
*/
-#include <thrift/thrift-config.h>
-
-#include <thrift/concurrency/Mutex.h>
-#include <thrift/concurrency/Util.h>
-
-#include <cassert>
-#include <chrono>
-#include <mutex>
+#include <thrift/concurrency/Thread.h>
namespace apache {
namespace thrift {
namespace concurrency {
-/**
- * Implementation of Mutex class using C++11 std::timed_mutex
- *
- * Methods throw std::system_error on error.
- *
- * @version $Id:$
- */
-class Mutex::impl : public std::timed_mutex {};
-
-Mutex::Mutex(Initializer init) : impl_(new Mutex::impl()) {
- ((void)init);
-}
-
-void* Mutex::getUnderlyingImpl() const {
- return impl_.get();
-}
-
-void Mutex::lock() const {
- impl_->lock();
-}
-
-bool Mutex::trylock() const {
- return impl_->try_lock();
-}
-
-bool Mutex::timedlock(int64_t ms) const {
- return impl_->try_lock_for(std::chrono::milliseconds(ms));
-}
+void Thread::threadMain(std::shared_ptr<Thread> thread) {
+ thread->setState(started);
+ thread->runnable()->run();
-void Mutex::unlock() const {
- impl_->unlock();
+ if (thread->getState() != stopping && thread->getState() != stopped) {
+ thread->setState(stopping);
+ }
}
-void Mutex::DEFAULT_INITIALIZER(void* arg) {
- ((void)arg);
-}
}
}
} // apache::thrift::concurrency
diff --git a/lib/cpp/src/thrift/concurrency/Thread.h b/lib/cpp/src/thrift/concurrency/Thread.h
index b2ea4e297..729d11a48 100644
--- a/lib/cpp/src/thrift/concurrency/Thread.h
+++ b/lib/cpp/src/thrift/concurrency/Thread.h
@@ -22,16 +22,10 @@
#include <stdint.h>
#include <memory>
+#include <thread>
#include <thrift/thrift-config.h>
-
-#if USE_STD_THREAD
-#include <thread>
-#else
-#ifdef HAVE_PTHREAD_H
-#include <pthread.h>
-#endif
-#endif
+#include <thrift/concurrency/Monitor.h>
namespace apache {
namespace thrift {
@@ -75,94 +69,106 @@ private:
*
* @see apache::thrift::concurrency::ThreadFactory)
*/
-class Thread {
+class Thread final : public std::enable_shared_from_this<Thread> {
public:
-#if USE_STD_THREAD
typedef std::thread::id id_t;
+ enum STATE { uninitialized, starting, started, stopping, stopped };
+
+ static void threadMain(std::shared_ptr<Thread> thread);
+
static inline bool is_current(id_t t) { return t == std::this_thread::get_id(); }
static inline id_t get_current() { return std::this_thread::get_id(); }
-#else
- typedef pthread_t id_t;
-
- static inline bool is_current(id_t t) { return pthread_equal(pthread_self(), t); }
- static inline id_t get_current() { return pthread_self(); }
-#endif
- virtual ~Thread(){};
+ Thread(bool detached, std::shared_ptr<Runnable> runnable)
+ : state_(uninitialized), detached_(detached) {
+ this->_runnable = runnable;
+ }
+
+ ~Thread() {
+ if (!detached_ && thread_->joinable()) {
+ try {
+ join();
+ } catch (...) {
+ // We're really hosed.
+ }
+ }
+ }
+
+ STATE getState() const
+ {
+ Synchronized sync(monitor_);
+ return state_;
+ }
+
+ void setState(STATE newState)
+ {
+ Synchronized sync(monitor_);
+ state_ = newState;
+
+ // unblock start() with the knowledge that the thread has actually
+ // started running, which avoids a race in detached threads.
+ if (newState == started) {
+ monitor_.notify();
+ }
+ }
/**
* Starts the thread. Does platform specific thread creation and
* configuration then invokes the run method of the Runnable object bound
* to this thread.
*/
- virtual void start() = 0;
+ void start() {
+ if (getState() != uninitialized) {
+ return;
+ }
+
+ std::shared_ptr<Thread> selfRef = shared_from_this();
+ setState(starting);
+
+ Synchronized sync(monitor_);
+ thread_ = std::unique_ptr<std::thread>(new std::thread(threadMain, selfRef));
+
+ if (detached_)
+ thread_->detach();
+
+ // Wait for the thread to start and get far enough to grab everything
+ // that it needs from the calling context, thus absolving the caller
+ // from being required to hold on to runnable indefinitely.
+ monitor_.wait();
+ }
/**
* Join this thread. If this thread is joinable, the calling thread blocks
* until this thread completes. If the target thread is not joinable, then
* nothing happens.
*/
- virtual void join() = 0;
+ void join() {
+ if (!detached_ && state_ != uninitialized) {
+ thread_->join();
+ }
+ }
/**
* Gets the thread's platform-specific ID
*/
- virtual id_t getId() = 0;
+ Thread::id_t getId() { return thread_.get() ? thread_->get_id() : std::thread::id(); }
/**
* Gets the runnable object this thread is hosting
*/
- virtual std::shared_ptr<Runnable> runnable() const { return _runnable; }
-
-protected:
- virtual void runnable(std::shared_ptr<Runnable> value) { _runnable = value; }
+ std::shared_ptr<Runnable> runnable() const { return _runnable; }
private:
std::shared_ptr<Runnable> _runnable;
-};
-
-/**
- * Factory to create platform-specific thread object and bind them to Runnable
- * object for execution
- */
-class ThreadFactory {
-protected:
- ThreadFactory(bool detached) : detached_(detached) { }
-
-public:
- virtual ~ThreadFactory() { }
-
- /**
- * Gets current detached mode
- */
- bool isDetached() const { return detached_; }
-
- /**
- * Sets the detached disposition of newly created threads.
- */
- void setDetached(bool detached) { detached_ = detached; }
-
- /**
- * Create a new thread.
- */
- virtual std::shared_ptr<Thread> newThread(std::shared_ptr<Runnable> runnable) const = 0;
-
- /**
- * Gets the current thread id or unknown_thread_id if the current thread is not a thrift thread
- */
- virtual Thread::id_t getCurrentThreadId() const = 0;
-
- /**
- * For code readability define the unknown/undefined thread id
- */
- static const Thread::id_t unknown_thread_id;
-
-private:
+ std::unique_ptr<std::thread> thread_;
+ Monitor monitor_;
+ STATE state_;
bool detached_;
};
+
}
}
} // apache::thrift::concurrency
diff --git a/lib/cpp/src/thrift/concurrency/PlatformThreadFactory.h b/lib/cpp/src/thrift/concurrency/ThreadFactory.cpp
index 99b44033b..941b99371 100644
--- a/lib/cpp/src/thrift/concurrency/PlatformThreadFactory.h
+++ b/lib/cpp/src/thrift/concurrency/ThreadFactory.cpp
@@ -17,32 +17,24 @@
* under the License.
*/
-#ifndef _THRIFT_CONCURRENCY_PLATFORMTHREADFACTORY_H_
-#define _THRIFT_CONCURRENCY_PLATFORMTHREADFACTORY_H_ 1
-
-// clang-format off
#include <thrift/thrift-config.h>
-#if USE_STD_THREAD
-# include <thrift/concurrency/StdThreadFactory.h>
-#else
-# include <thrift/concurrency/PosixThreadFactory.h>
-#endif
-// clang-format on
+
+#include <thrift/concurrency/ThreadFactory.h>
+#include <memory>
namespace apache {
namespace thrift {
namespace concurrency {
-// clang-format off
-#if USE_STD_THREAD
- typedef StdThreadFactory PlatformThreadFactory;
-#else
- typedef PosixThreadFactory PlatformThreadFactory;
-#endif
-// clang-format on
+std::shared_ptr<Thread> ThreadFactory::newThread(std::shared_ptr<Runnable> runnable) const {
+ std::shared_ptr<Thread> result = std::shared_ptr<Thread>(new Thread(isDetached(), runnable));
+ runnable->thread(result);
+ return result;
+}
+Thread::id_t ThreadFactory::getCurrentThreadId() const {
+ return std::this_thread::get_id();
+}
}
}
} // apache::thrift::concurrency
-
-#endif // #ifndef _THRIFT_CONCURRENCY_PLATFORMTHREADFACTORY_H_
diff --git a/lib/cpp/src/thrift/concurrency/StdThreadFactory.h b/lib/cpp/src/thrift/concurrency/ThreadFactory.h
index e74046b7b..f317afcde 100644
--- a/lib/cpp/src/thrift/concurrency/StdThreadFactory.h
+++ b/lib/cpp/src/thrift/concurrency/ThreadFactory.h
@@ -17,45 +17,65 @@
* under the License.
*/
-#ifndef _THRIFT_CONCURRENCY_STDTHREADFACTORY_H_
-#define _THRIFT_CONCURRENCY_STDTHREADFACTORY_H_ 1
+#ifndef _THRIFT_CONCURRENCY_THREADFACTORY_H_
+#define _THRIFT_CONCURRENCY_THREADFACTORY_H_ 1
#include <thrift/concurrency/Thread.h>
#include <memory>
-
namespace apache {
namespace thrift {
namespace concurrency {
/**
- * A thread factory to create std::threads.
- *
- * @version $Id:$
+ * Factory to create thread object and bind them to Runnable
+ * object for execution
*/
-class StdThreadFactory : public ThreadFactory {
-
+class ThreadFactory final {
public:
/**
- * Std thread factory. All threads created by a factory are reference-counted
+ * All threads created by a factory are reference-counted
* via std::shared_ptr. The factory guarantees that threads and the Runnable tasks
* they host will be properly cleaned up once the last strong reference
* to both is given up.
*
* By default threads are not joinable.
*/
+ ThreadFactory(bool detached = true) : detached_(detached) { }
- StdThreadFactory(bool detached = true);
+ ~ThreadFactory() = default;
+
+ /**
+ * Gets current detached mode
+ */
+ bool isDetached() const { return detached_; }
+
+ /**
+ * Sets the detached disposition of newly created threads.
+ */
+ void setDetached(bool detached) { detached_ = detached; }
- // From ThreadFactory;
+ /**
+ * Create a new thread.
+ */
std::shared_ptr<Thread> newThread(std::shared_ptr<Runnable> runnable) const;
- // From ThreadFactory;
+ /**
+ * Gets the current thread id or unknown_thread_id if the current thread is not a thrift thread
+ */
Thread::id_t getCurrentThreadId() const;
+
+ /**
+ * For code readability define the unknown/undefined thread id
+ */
+ static const Thread::id_t unknown_thread_id;
+
+private:
+ bool detached_;
};
}
}
} // apache::thrift::concurrency
-#endif // #ifndef _THRIFT_CONCURRENCY_STDTHREADFACTORY_H_
+#endif // #ifndef _THRIFT_CONCURRENCY_THREADFACTORY_H_
diff --git a/lib/cpp/src/thrift/concurrency/ThreadManager.h b/lib/cpp/src/thrift/concurrency/ThreadManager.h
index 470fc0aae..4b4b3d491 100644
--- a/lib/cpp/src/thrift/concurrency/ThreadManager.h
+++ b/lib/cpp/src/thrift/concurrency/ThreadManager.h
@@ -23,7 +23,7 @@
#include <functional>
#include <memory>
#include <sys/types.h>
-#include <thrift/concurrency/Thread.h>
+#include <thrift/concurrency/ThreadFactory.h>
namespace apache {
namespace thrift {
diff --git a/lib/cpp/src/thrift/concurrency/TimerManager.h b/lib/cpp/src/thrift/concurrency/TimerManager.h
index ba792264c..4d73b002d 100644
--- a/lib/cpp/src/thrift/concurrency/TimerManager.h
+++ b/lib/cpp/src/thrift/concurrency/TimerManager.h
@@ -22,7 +22,7 @@
#include <thrift/concurrency/Exception.h>
#include <thrift/concurrency/Monitor.h>
-#include <thrift/concurrency/Thread.h>
+#include <thrift/concurrency/ThreadFactory.h>
#include <memory>
#include <map>
diff --git a/lib/cpp/src/thrift/server/TNonblockingServer.cpp b/lib/cpp/src/thrift/server/TNonblockingServer.cpp
index f16fce789..31ff2a96a 100644
--- a/lib/cpp/src/thrift/server/TNonblockingServer.cpp
+++ b/lib/cpp/src/thrift/server/TNonblockingServer.cpp
@@ -22,7 +22,7 @@
#include <thrift/server/TNonblockingServer.h>
#include <thrift/concurrency/Exception.h>
#include <thrift/transport/TSocket.h>
-#include <thrift/concurrency/PlatformThreadFactory.h>
+#include <thrift/concurrency/ThreadFactory.h>
#include <thrift/transport/PlatformSocket.h>
#include <algorithm>
@@ -1118,12 +1118,7 @@ void TNonblockingServer::registerEvents(event_base* user_event_base) {
// Launch all the secondary IO threads in separate threads
if (ioThreads_.size() > 1) {
- ioThreadFactory_.reset(new PlatformThreadFactory(
-#if !USE_STD_THREAD
- PlatformThreadFactory::OTHER, // scheduler
- PlatformThreadFactory::NORMAL, // priority
- 1, // stack size (MB)
-#endif
+ ioThreadFactory_.reset(new ThreadFactory(
false // detached
));
diff --git a/lib/cpp/src/thrift/server/TNonblockingServer.h b/lib/cpp/src/thrift/server/TNonblockingServer.h
index e79c24f62..2c2389c3c 100644
--- a/lib/cpp/src/thrift/server/TNonblockingServer.h
+++ b/lib/cpp/src/thrift/server/TNonblockingServer.h
@@ -30,7 +30,7 @@
#include <thrift/concurrency/ThreadManager.h>
#include <climits>
#include <thrift/concurrency/Thread.h>
-#include <thrift/concurrency/PlatformThreadFactory.h>
+#include <thrift/concurrency/ThreadFactory.h>
#include <thrift/concurrency/Mutex.h>
#include <stack>
#include <vector>
@@ -53,7 +53,7 @@ using apache::thrift::transport::TNonblockingServerTransport;
using apache::thrift::protocol::TProtocol;
using apache::thrift::concurrency::Runnable;
using apache::thrift::concurrency::ThreadManager;
-using apache::thrift::concurrency::PlatformThreadFactory;
+using apache::thrift::concurrency::ThreadFactory;
using apache::thrift::concurrency::ThreadFactory;
using apache::thrift::concurrency::Thread;
using apache::thrift::concurrency::Mutex;
@@ -166,7 +166,7 @@ private:
bool threadPoolProcessing_;
// Factory to create the IO threads
- std::shared_ptr<PlatformThreadFactory> ioThreadFactory_;
+ std::shared_ptr<ThreadFactory> ioThreadFactory_;
// Vector of IOThread objects that will handle our IO
std::vector<std::shared_ptr<TNonblockingIOThread> > ioThreads_;
@@ -386,9 +386,7 @@ public:
/**
* Sets the number of IO threads used by this server. Can only be used before
- * the call to serve() and has no effect afterwards. We always use a
- * PosixThreadFactory for the IO worker threads, because they must joinable
- * for clean shutdown.
+ * the call to serve() and has no effect afterwards.
*/
void setNumIOThreads(size_t numThreads) {
numIOThreads_ = numThreads;
diff --git a/lib/cpp/src/thrift/server/TThreadedServer.cpp b/lib/cpp/src/thrift/server/TThreadedServer.cpp
index 2264df79b..ed2d80d00 100644
--- a/lib/cpp/src/thrift/server/TThreadedServer.cpp
+++ b/lib/cpp/src/thrift/server/TThreadedServer.cpp
@@ -19,7 +19,7 @@
#include <string>
#include <memory>
-#include <thrift/concurrency/PlatformThreadFactory.h>
+#include <thrift/concurrency/ThreadFactory.h>
#include <thrift/server/TThreadedServer.h>
namespace apache {
diff --git a/lib/cpp/src/thrift/server/TThreadedServer.h b/lib/cpp/src/thrift/server/TThreadedServer.h
index c5ccd03c8..9fc9d1125 100644
--- a/lib/cpp/src/thrift/server/TThreadedServer.h
+++ b/lib/cpp/src/thrift/server/TThreadedServer.h
@@ -22,7 +22,7 @@
#include <map>
#include <thrift/concurrency/Monitor.h>
-#include <thrift/concurrency/PlatformThreadFactory.h>
+#include <thrift/concurrency/ThreadFactory.h>
#include <thrift/concurrency/Thread.h>
#include <thrift/server/TServerFramework.h>
@@ -44,7 +44,7 @@ public:
const std::shared_ptr<apache::thrift::protocol::TProtocolFactory>& protocolFactory,
const std::shared_ptr<apache::thrift::concurrency::ThreadFactory>& threadFactory
= std::shared_ptr<apache::thrift::concurrency::ThreadFactory>(
- new apache::thrift::concurrency::PlatformThreadFactory(false)));
+ new apache::thrift::concurrency::ThreadFactory(false)));
TThreadedServer(
const std::shared_ptr<apache::thrift::TProcessor>& processor,
@@ -53,7 +53,7 @@ public:
const std::shared_ptr<apache::thrift::protocol::TProtocolFactory>& protocolFactory,
const std::shared_ptr<apache::thrift::concurrency::ThreadFactory>& threadFactory
= std::shared_ptr<apache::thrift::concurrency::ThreadFactory>(
- new apache::thrift::concurrency::PlatformThreadFactory(false)));
+ new apache::thrift::concurrency::ThreadFactory(false)));
TThreadedServer(
const std::shared_ptr<apache::thrift::TProcessorFactory>& processorFactory,
@@ -64,7 +64,7 @@ public:
const std::shared_ptr<apache::thrift::protocol::TProtocolFactory>& outputProtocolFactory,
const std::shared_ptr<apache::thrift::concurrency::ThreadFactory>& threadFactory
= std::shared_ptr<apache::thrift::concurrency::ThreadFactory>(
- new apache::thrift::concurrency::PlatformThreadFactory(false)));
+ new apache::thrift::concurrency::ThreadFactory(false)));
TThreadedServer(
const std::shared_ptr<apache::thrift::TProcessor>& processor,
@@ -75,7 +75,7 @@ public:
const std::shared_ptr<apache::thrift::protocol::TProtocolFactory>& outputProtocolFactory,
const std::shared_ptr<apache::thrift::concurrency::ThreadFactory>& threadFactory
= std::shared_ptr<apache::thrift::concurrency::ThreadFactory>(
- new apache::thrift::concurrency::PlatformThreadFactory(false)));
+ new apache::thrift::concurrency::ThreadFactory(false)));
virtual ~TThreadedServer();
diff --git a/lib/cpp/src/thrift/transport/TFileTransport.h b/lib/cpp/src/thrift/transport/TFileTransport.h
index 6cc7bd24b..ece271aae 100644
--- a/lib/cpp/src/thrift/transport/TFileTransport.h
+++ b/lib/cpp/src/thrift/transport/TFileTransport.h
@@ -30,7 +30,7 @@
#include <thrift/concurrency/Mutex.h>
#include <thrift/concurrency/Monitor.h>
-#include <thrift/concurrency/PlatformThreadFactory.h>
+#include <thrift/concurrency/ThreadFactory.h>
#include <thrift/concurrency/Thread.h>
namespace apache {
@@ -336,7 +336,7 @@ private:
static const uint32_t DEFAULT_WRITER_THREAD_SLEEP_TIME_US = 60 * 1000 * 1000;
// writer thread
- apache::thrift::concurrency::PlatformThreadFactory threadFactory_;
+ apache::thrift::concurrency::ThreadFactory threadFactory_;
std::shared_ptr<apache::thrift::concurrency::Thread> writerThread_;
// buffers to hold data before it is flushed. Each element of the buffer stores a msg that
diff --git a/lib/cpp/src/thrift/windows/config.h b/lib/cpp/src/thrift/windows/config.h
index a5f44577d..14a3f4f36 100644
--- a/lib/cpp/src/thrift/windows/config.h
+++ b/lib/cpp/src/thrift/windows/config.h
@@ -28,11 +28,6 @@
#error "This is a Windows header only"
#endif
-// use std::thread in MSVC11 (2012) or newer and in MinGW
-#if (_MSC_VER >= 1700) || defined(__MINGW32__)
-#define USE_STD_THREAD 1
-#endif
-
// Something that defines PRId64 is required to build
#define HAVE_INTTYPES_H 1