From ca8af9b48f9e62edc57c4a233c6377144214ef5a Mon Sep 17 00:00:00 2001 From: cyy Date: Fri, 11 Jan 2019 22:13:12 +0800 Subject: THRIFT-4730: remove pthread code and refactor, ending up with just ThreadFactory --- lib/cpp/CMakeLists.txt | 27 +- lib/cpp/Makefile.am | 12 +- lib/cpp/src/thrift/concurrency/Monitor.cpp | 164 +++++----- lib/cpp/src/thrift/concurrency/Mutex.cpp | 335 +-------------------- lib/cpp/src/thrift/concurrency/Mutex.h | 101 +------ .../src/thrift/concurrency/PlatformThreadFactory.h | 48 --- .../src/thrift/concurrency/PosixThreadFactory.cpp | 335 --------------------- .../src/thrift/concurrency/PosixThreadFactory.h | 129 -------- lib/cpp/src/thrift/concurrency/StdMonitor.cpp | 213 ------------- lib/cpp/src/thrift/concurrency/StdMutex.cpp | 71 ----- .../src/thrift/concurrency/StdThreadFactory.cpp | 153 ---------- lib/cpp/src/thrift/concurrency/StdThreadFactory.h | 61 ---- lib/cpp/src/thrift/concurrency/Thread.cpp | 37 +++ lib/cpp/src/thrift/concurrency/Thread.h | 132 ++++---- lib/cpp/src/thrift/concurrency/ThreadFactory.cpp | 40 +++ lib/cpp/src/thrift/concurrency/ThreadFactory.h | 81 +++++ lib/cpp/src/thrift/concurrency/ThreadManager.h | 2 +- lib/cpp/src/thrift/concurrency/TimerManager.h | 2 +- lib/cpp/src/thrift/server/TNonblockingServer.cpp | 9 +- lib/cpp/src/thrift/server/TNonblockingServer.h | 10 +- lib/cpp/src/thrift/server/TThreadedServer.cpp | 2 +- lib/cpp/src/thrift/server/TThreadedServer.h | 10 +- lib/cpp/src/thrift/transport/TFileTransport.h | 4 +- lib/cpp/src/thrift/windows/config.h | 5 - lib/cpp/test/CMakeLists.txt | 5 - lib/cpp/test/Makefile.am | 4 - lib/cpp/test/TNonblockingSSLServerTest.cpp | 7 +- lib/cpp/test/TNonblockingServerTest.cpp | 9 +- lib/cpp/test/TServerIntegrationTest.cpp | 8 +- lib/cpp/test/TransportTest.cpp | 2 +- lib/cpp/test/concurrency/MutexTest.cpp | 123 -------- lib/cpp/test/concurrency/RWMutexStarveTest.cpp | 157 ---------- lib/cpp/test/concurrency/ThreadFactoryTests.h | 10 +- lib/cpp/test/concurrency/ThreadManagerTests.h | 59 +--- lib/cpp/test/concurrency/TimerManagerTests.h | 12 +- lib/cpp/test/processor/ProcessorTest.cpp | 6 +- lib/cpp/test/processor/ServerThread.cpp | 4 +- 37 files changed, 371 insertions(+), 2018 deletions(-) delete mode 100644 lib/cpp/src/thrift/concurrency/PlatformThreadFactory.h delete mode 100644 lib/cpp/src/thrift/concurrency/PosixThreadFactory.cpp delete mode 100644 lib/cpp/src/thrift/concurrency/PosixThreadFactory.h delete mode 100644 lib/cpp/src/thrift/concurrency/StdMonitor.cpp delete mode 100644 lib/cpp/src/thrift/concurrency/StdMutex.cpp delete mode 100644 lib/cpp/src/thrift/concurrency/StdThreadFactory.cpp delete mode 100644 lib/cpp/src/thrift/concurrency/StdThreadFactory.h create mode 100644 lib/cpp/src/thrift/concurrency/Thread.cpp create mode 100644 lib/cpp/src/thrift/concurrency/ThreadFactory.cpp create mode 100644 lib/cpp/src/thrift/concurrency/ThreadFactory.h delete mode 100644 lib/cpp/test/concurrency/MutexTest.cpp delete mode 100644 lib/cpp/test/concurrency/RWMutexStarveTest.cpp (limited to 'lib') diff --git a/lib/cpp/CMakeLists.txt b/lib/cpp/CMakeLists.txt index 9e36665bd..e12c08ce2 100755 --- a/lib/cpp/CMakeLists.txt +++ b/lib/cpp/CMakeLists.txt @@ -105,32 +105,19 @@ if(OPENSSL_FOUND AND WITH_OPENSSL) list(APPEND SYSLIBS "${OPENSSL_LIBRARIES}") endif() -# WITH_*THREADS selects which threading library to use -if(UNIX AND NOT WITH_STDTHREADS) +if(UNIX) if(ANDROID) set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -pthread") else() list(APPEND SYSLIBS pthread) endif() - set( thriftcpp_threads_SOURCES - src/thrift/concurrency/PosixThreadFactory.cpp - src/thrift/concurrency/Mutex.cpp - src/thrift/concurrency/Monitor.cpp - ) -else() - if(UNIX) - if(ANDROID) - set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -pthread") - else() - list(APPEND SYSLIBS pthread) - endif() - endif() - set( thriftcpp_threads_SOURCES - src/thrift/concurrency/StdThreadFactory.cpp - src/thrift/concurrency/StdMutex.cpp - src/thrift/concurrency/StdMonitor.cpp - ) endif() +set( thriftcpp_threads_SOURCES + src/thrift/concurrency/ThreadFactory.cpp + src/thrift/concurrency/Thread.cpp + src/thrift/concurrency/Monitor.cpp + src/thrift/concurrency/Mutex.cpp +) # Thrift non blocking server set( thriftcppnb_SOURCES diff --git a/lib/cpp/Makefile.am b/lib/cpp/Makefile.am index 85bb9abf8..19bedd753 100755 --- a/lib/cpp/Makefile.am +++ b/lib/cpp/Makefile.am @@ -107,8 +107,9 @@ libthrift_la_SOURCES = src/thrift/TApplicationException.cpp \ src/thrift/server/TThreadedServer.cpp libthrift_la_SOURCES += src/thrift/concurrency/Mutex.cpp \ - src/thrift/concurrency/Monitor.cpp \ - src/thrift/concurrency/PosixThreadFactory.cpp + src/thrift/concurrency/ThreadFactory.cpp \ + src/thrift/concurrency/Thread.cpp \ + src/thrift/concurrency/Monitor.cpp libthriftnb_la_SOURCES = src/thrift/server/TNonblockingServer.cpp \ src/thrift/async/TEvhttpServer.cpp \ @@ -166,12 +167,7 @@ include_concurrency_HEADERS = \ src/thrift/concurrency/Exception.h \ src/thrift/concurrency/Mutex.h \ src/thrift/concurrency/Monitor.h \ - src/thrift/concurrency/PlatformThreadFactory.h \ - src/thrift/concurrency/PosixThreadFactory.h \ - src/thrift/concurrency/StdMonitor.cpp \ - src/thrift/concurrency/StdMutex.cpp \ - src/thrift/concurrency/StdThreadFactory.cpp \ - src/thrift/concurrency/StdThreadFactory.h \ + src/thrift/concurrency/ThreadFactory.h \ src/thrift/concurrency/Thread.h \ src/thrift/concurrency/ThreadManager.h \ src/thrift/concurrency/TimerManager.h \ diff --git a/lib/cpp/src/thrift/concurrency/Monitor.cpp b/lib/cpp/src/thrift/concurrency/Monitor.cpp index 9570cc691..7b3b209a7 100644 --- a/lib/cpp/src/thrift/concurrency/Monitor.cpp +++ b/lib/cpp/src/thrift/concurrency/Monitor.cpp @@ -23,43 +23,36 @@ #include #include #include -#include - #include -#include - -#include +#include +#include +#include +#include namespace apache { namespace thrift { - -using std::unique_ptr; -using std::shared_ptr; - namespace concurrency { /** - * Monitor implementation using the POSIX pthread library + * Monitor implementation using the std thread library * * @version $Id:$ */ class Monitor::Impl { public: - Impl() : ownedMutex_(new Mutex()), mutex_(NULL), condInitialized_(false) { - init(ownedMutex_.get()); - } - - Impl(Mutex* mutex) : mutex_(NULL), condInitialized_(false) { init(mutex); } + Impl() : ownedMutex_(new Mutex()), conditionVariable_(), mutex_(NULL) { init(ownedMutex_.get()); } - Impl(Monitor* monitor) : mutex_(NULL), condInitialized_(false) { init(&(monitor->mutex())); } + Impl(Mutex* mutex) : ownedMutex_(), conditionVariable_(), mutex_(NULL) { init(mutex); } - ~Impl() { cleanup(); } + Impl(Monitor* monitor) : ownedMutex_(), conditionVariable_(), mutex_(NULL) { + init(&(monitor->mutex())); + } Mutex& mutex() { return *mutex_; } - void lock() { mutex().lock(); } - void unlock() { mutex().unlock(); } + void lock() { mutex_->lock(); } + void unlock() { mutex_->unlock(); } /** * Exception-throwing version of waitForTimeRelative(), called simply @@ -68,15 +61,12 @@ public: * If the condition occurs, this function returns cleanly; on timeout or * error an exception is thrown. */ - void wait(int64_t timeout_ms) const { + void wait(int64_t timeout_ms) { int result = waitForTimeRelative(timeout_ms); if (result == THRIFT_ETIMEDOUT) { - // pthread_cond_timedwait has been observed to return early on - // various platforms, so comment out this assert. - // assert(Util::currentTime() >= (now + timeout)); throw TimedOutException(); } else if (result != 0) { - throw TException("pthread_cond_wait() or pthread_cond_timedwait() failed"); + throw TException("Monitor::wait() failed"); } } @@ -86,88 +76,86 @@ public: * * Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code. */ - int waitForTimeRelative(int64_t timeout_ms) const { + int waitForTimeRelative(int64_t timeout_ms) { if (timeout_ms == 0LL) { return waitForever(); } - struct THRIFT_TIMESPEC abstime; - Util::toTimespec(abstime, Util::currentTime() + timeout_ms); - return waitForTime(&abstime); + assert(mutex_); + std::timed_mutex* mutexImpl = static_cast(mutex_->getUnderlyingImpl()); + assert(mutexImpl); + + std::unique_lock lock(*mutexImpl, std::adopt_lock); + bool timedout = (conditionVariable_.wait_for(lock, std::chrono::milliseconds(timeout_ms)) + == std::cv_status::timeout); + lock.release(); + return (timedout ? THRIFT_ETIMEDOUT : 0); } /** * Waits until the absolute time specified using struct THRIFT_TIMESPEC. * Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code. */ - int waitForTime(const THRIFT_TIMESPEC* abstime) const { + int waitForTime(const THRIFT_TIMESPEC* abstime) { + struct timeval temp; + temp.tv_sec = static_cast(abstime->tv_sec); + temp.tv_usec = static_cast(abstime->tv_nsec) / 1000; + return waitForTime(&temp); + } + + /** + * Waits until the absolute time specified using struct timeval. + * Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code. + */ + int waitForTime(const struct timeval* abstime) { assert(mutex_); - pthread_mutex_t* mutexImpl = reinterpret_cast(mutex_->getUnderlyingImpl()); + std::timed_mutex* mutexImpl = static_cast(mutex_->getUnderlyingImpl()); assert(mutexImpl); - // XXX Need to assert that caller owns mutex - return pthread_cond_timedwait(&pthread_cond_, mutexImpl, abstime); + struct timeval currenttime; + Util::toTimeval(currenttime, Util::currentTime()); + + long tv_sec = static_cast(abstime->tv_sec - currenttime.tv_sec); + long tv_usec = static_cast(abstime->tv_usec - currenttime.tv_usec); + if (tv_sec < 0) + tv_sec = 0; + if (tv_usec < 0) + tv_usec = 0; + + std::unique_lock lock(*mutexImpl, std::adopt_lock); + bool timedout = (conditionVariable_.wait_for(lock, + std::chrono::seconds(tv_sec) + + std::chrono::microseconds(tv_usec)) + == std::cv_status::timeout); + lock.release(); + return (timedout ? THRIFT_ETIMEDOUT : 0); } - int waitForTime(const struct timeval* abstime) const { - struct THRIFT_TIMESPEC temp; - temp.tv_sec = abstime->tv_sec; - temp.tv_nsec = abstime->tv_usec * 1000; - return waitForTime(&temp); - } /** * Waits forever until the condition occurs. * Returns 0 if condition occurs, or an error code otherwise. */ - int waitForever() const { + int waitForever() { assert(mutex_); - pthread_mutex_t* mutexImpl = reinterpret_cast(mutex_->getUnderlyingImpl()); + std::timed_mutex* mutexImpl = static_cast(mutex_->getUnderlyingImpl()); assert(mutexImpl); - return pthread_cond_wait(&pthread_cond_, mutexImpl); - } - void notify() { - // XXX Need to assert that caller owns mutex - int iret = pthread_cond_signal(&pthread_cond_); - THRIFT_UNUSED_VARIABLE(iret); - assert(iret == 0); + std::unique_lock lock(*mutexImpl, std::adopt_lock); + conditionVariable_.wait(lock); + lock.release(); + return 0; } - void notifyAll() { - // XXX Need to assert that caller owns mutex - int iret = pthread_cond_broadcast(&pthread_cond_); - THRIFT_UNUSED_VARIABLE(iret); - assert(iret == 0); - } - -private: - void init(Mutex* mutex) { - mutex_ = mutex; + void notify() { conditionVariable_.notify_one(); } - if (pthread_cond_init(&pthread_cond_, NULL) == 0) { - condInitialized_ = true; - } + void notifyAll() { conditionVariable_.notify_all(); } - if (!condInitialized_) { - cleanup(); - throw SystemResourceException(); - } - } - - void cleanup() { - if (condInitialized_) { - condInitialized_ = false; - int iret = pthread_cond_destroy(&pthread_cond_); - THRIFT_UNUSED_VARIABLE(iret); - assert(iret == 0); - } - } +private: + void init(Mutex* mutex) { mutex_ = mutex; } - unique_ptr ownedMutex_; + const std::unique_ptr ownedMutex_; + std::condition_variable_any conditionVariable_; Mutex* mutex_; - - mutable pthread_cond_t pthread_cond_; - mutable bool condInitialized_; }; Monitor::Monitor() : impl_(new Monitor::Impl()) { @@ -182,43 +170,43 @@ Monitor::~Monitor() { } Mutex& Monitor::mutex() const { - return impl_->mutex(); + return const_cast(impl_)->mutex(); } void Monitor::lock() const { - impl_->lock(); + const_cast(impl_)->lock(); } void Monitor::unlock() const { - impl_->unlock(); + const_cast(impl_)->unlock(); } void Monitor::wait(int64_t timeout) const { - impl_->wait(timeout); + const_cast(impl_)->wait(timeout); } int Monitor::waitForTime(const THRIFT_TIMESPEC* abstime) const { - return impl_->waitForTime(abstime); + return const_cast(impl_)->waitForTime(abstime); } int Monitor::waitForTime(const timeval* abstime) const { - return impl_->waitForTime(abstime); + return const_cast(impl_)->waitForTime(abstime); } int Monitor::waitForTimeRelative(int64_t timeout_ms) const { - return impl_->waitForTimeRelative(timeout_ms); + return const_cast(impl_)->waitForTimeRelative(timeout_ms); } int Monitor::waitForever() const { - return impl_->waitForever(); + return const_cast(impl_)->waitForever(); } void Monitor::notify() const { - impl_->notify(); + const_cast(impl_)->notify(); } void Monitor::notifyAll() const { - impl_->notifyAll(); + const_cast(impl_)->notifyAll(); } } } diff --git a/lib/cpp/src/thrift/concurrency/Mutex.cpp b/lib/cpp/src/thrift/concurrency/Mutex.cpp index a5264617d..75802835d 100644 --- a/lib/cpp/src/thrift/concurrency/Mutex.cpp +++ b/lib/cpp/src/thrift/concurrency/Mutex.cpp @@ -17,202 +17,29 @@ * under the License. */ -// needed to test for pthread implementation capabilities: -#define __USE_GNU - -#include - -#include -#include #include -#include - -#include -#include -#include -#include -#include -#include +#include +#include namespace apache { namespace thrift { namespace concurrency { -// Enable this to turn on mutex contention profiling support -// #define THRIFT_PTHREAD_MUTEX_CONTENTION_PROFILING - -#ifdef THRIFT_PTHREAD_MUTEX_CONTENTION_PROFILING - -static int32_t mutexProfilingCounter = 0; -static int32_t mutexProfilingSampleRate = 0; -static MutexWaitCallback mutexProfilingCallback = 0; - -void enableMutexProfiling(int32_t profilingSampleRate, MutexWaitCallback callback) { - mutexProfilingSampleRate = profilingSampleRate; - mutexProfilingCallback = callback; -} - -#define PROFILE_MUTEX_START_LOCK() int64_t _lock_startTime = maybeGetProfilingStartTime(); - -#define PROFILE_MUTEX_NOT_LOCKED() \ - do { \ - if (_lock_startTime > 0) { \ - int64_t endTime = Util::currentTimeUsec(); \ - (*mutexProfilingCallback)(this, endTime - _lock_startTime); \ - } \ - } while (0) - -#define PROFILE_MUTEX_LOCKED() \ - do { \ - profileTime_ = _lock_startTime; \ - if (profileTime_ > 0) { \ - profileTime_ = Util::currentTimeUsec() - profileTime_; \ - } \ - } while (0) - -#define PROFILE_MUTEX_START_UNLOCK() \ - int64_t _temp_profileTime = profileTime_; \ - profileTime_ = 0; - -#define PROFILE_MUTEX_UNLOCKED() \ - do { \ - if (_temp_profileTime > 0) { \ - (*mutexProfilingCallback)(this, _temp_profileTime); \ - } \ - } while (0) - -static inline int64_t maybeGetProfilingStartTime() { - if (mutexProfilingSampleRate && mutexProfilingCallback) { - // This block is unsynchronized, but should produce a reasonable sampling - // rate on most architectures. The main race conditions are the gap - // between the decrement and the test, the non-atomicity of decrement, and - // potential caching of different values at different CPUs. - // - // - if two decrements race, the likeliest result is that the counter - // decrements slowly (perhaps much more slowly) than intended. - // - // - many threads could potentially decrement before resetting the counter - // to its large value, causing each additional incoming thread to - // profile every call. This situation is unlikely to persist for long - // as the critical gap is quite short, but profiling could be bursty. - sig_atomic_t localValue = --mutexProfilingCounter; - if (localValue <= 0) { - mutexProfilingCounter = mutexProfilingSampleRate; - return Util::currentTimeUsec(); - } - } - - return 0; -} - -#else -#define PROFILE_MUTEX_START_LOCK() -#define PROFILE_MUTEX_NOT_LOCKED() -#define PROFILE_MUTEX_LOCKED() -#define PROFILE_MUTEX_START_UNLOCK() -#define PROFILE_MUTEX_UNLOCKED() -#endif // THRIFT_PTHREAD_MUTEX_CONTENTION_PROFILING - -#define EINTR_LOOP(_CALL) int ret; do { ret = _CALL; } while (ret == EINTR) -#define ABORT_ONFAIL(_CALL) { EINTR_LOOP(_CALL); if (ret) { abort(); } } -#define THROW_SRE(_CALLSTR, RET) { throw SystemResourceException(boost::str(boost::format("%1% returned %2% (%3%)") % _CALLSTR % RET % ::strerror(RET))); } -#define THROW_SRE_ONFAIL(_CALL) { EINTR_LOOP(_CALL); if (ret) { THROW_SRE(#_CALL, ret); } } -#define THROW_SRE_TRYFAIL(_CALL) { EINTR_LOOP(_CALL); if (ret == 0) { return true; } else if (ret == EBUSY) { return false; } THROW_SRE(#_CALL, ret); } - /** - * Implementation of Mutex class using POSIX mutex + * Implementation of Mutex class using C++11 std::timed_mutex * - * Throws apache::thrift::concurrency::SystemResourceException on error. + * Methods throw std::system_error on error. * * @version $Id:$ */ -class Mutex::impl { -public: - impl(Initializer init) : initialized_(false) { -#ifdef THRIFT_PTHREAD_MUTEX_CONTENTION_PROFILING - profileTime_ = 0; -#endif - init(&pthread_mutex_); - initialized_ = true; - } +class Mutex::impl : public std::timed_mutex {}; - ~impl() { - if (initialized_) { - initialized_ = false; - ABORT_ONFAIL(pthread_mutex_destroy(&pthread_mutex_)); - } - } - - void lock() const { - PROFILE_MUTEX_START_LOCK(); - THROW_SRE_ONFAIL(pthread_mutex_lock(&pthread_mutex_)); - PROFILE_MUTEX_LOCKED(); - } - - bool trylock() const { - THROW_SRE_TRYFAIL(pthread_mutex_trylock(&pthread_mutex_)); - } - - bool timedlock(int64_t milliseconds) const { -#if defined(_POSIX_TIMEOUTS) && _POSIX_TIMEOUTS >= 200112L - PROFILE_MUTEX_START_LOCK(); - - struct THRIFT_TIMESPEC ts; - Util::toTimespec(ts, milliseconds + Util::currentTime()); - EINTR_LOOP(pthread_mutex_timedlock(&pthread_mutex_, &ts)); - if (ret == 0) { - PROFILE_MUTEX_LOCKED(); - return true; - } else if (ret == ETIMEDOUT) { - PROFILE_MUTEX_NOT_LOCKED(); - return false; - } - - THROW_SRE("pthread_mutex_timedlock(&pthread_mutex_, &ts)", ret); -#else - /* Otherwise follow solution used by Mono for Android */ - struct THRIFT_TIMESPEC sleepytime, now, to; - - /* This is just to avoid a completely busy wait */ - sleepytime.tv_sec = 0; - sleepytime.tv_nsec = 10000000L; /* 10ms */ - - Util::toTimespec(to, milliseconds + Util::currentTime()); - - while ((trylock()) == false) { - Util::toTimespec(now, Util::currentTime()); - if (now.tv_sec >= to.tv_sec && now.tv_nsec >= to.tv_nsec) { - return false; - } - nanosleep(&sleepytime, NULL); - } - - return true; -#endif - } - - void unlock() const { - PROFILE_MUTEX_START_UNLOCK(); - THROW_SRE_ONFAIL(pthread_mutex_unlock(&pthread_mutex_)); - PROFILE_MUTEX_UNLOCKED(); - } - - void* getUnderlyingImpl() const { return (void*)&pthread_mutex_; } - -private: - mutable pthread_mutex_t pthread_mutex_; - mutable bool initialized_; -#ifdef THRIFT_PTHREAD_MUTEX_CONTENTION_PROFILING - mutable int64_t profileTime_; -#endif -}; - -Mutex::Mutex(Initializer init) : impl_(new Mutex::impl(init)) { +Mutex::Mutex() : impl_(new Mutex::impl()) { } void* Mutex::getUnderlyingImpl() const { - return impl_->getUnderlyingImpl(); + return impl_.get(); } void Mutex::lock() const { @@ -220,161 +47,17 @@ void Mutex::lock() const { } bool Mutex::trylock() const { - return impl_->trylock(); + return impl_->try_lock(); } bool Mutex::timedlock(int64_t ms) const { - return impl_->timedlock(ms); + return impl_->try_lock_for(std::chrono::milliseconds(ms)); } void Mutex::unlock() const { impl_->unlock(); } -void Mutex::DEFAULT_INITIALIZER(void* arg) { - pthread_mutex_t* pthread_mutex = (pthread_mutex_t*)arg; - THROW_SRE_ONFAIL(pthread_mutex_init(pthread_mutex, NULL)); -} - -#if defined(PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP) || defined(PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP) || defined(PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP) -static void init_with_kind(pthread_mutex_t* mutex, int kind) { - pthread_mutexattr_t mutexattr; - THROW_SRE_ONFAIL(pthread_mutexattr_init(&mutexattr)); - THROW_SRE_ONFAIL(pthread_mutexattr_settype(&mutexattr, kind)); - THROW_SRE_ONFAIL(pthread_mutex_init(mutex, &mutexattr)); - THROW_SRE_ONFAIL(pthread_mutexattr_destroy(&mutexattr)); -} -#endif - -#ifdef PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP -void Mutex::ADAPTIVE_INITIALIZER(void* arg) { - // From mysql source: mysys/my_thr_init.c - // Set mutex type to "fast" a.k.a "adaptive" - // - // In this case the thread may steal the mutex from some other thread - // that is waiting for the same mutex. This will save us some - // context switches but may cause a thread to 'starve forever' while - // waiting for the mutex (not likely if the code within the mutex is - // short). - init_with_kind((pthread_mutex_t*)arg, PTHREAD_MUTEX_ADAPTIVE_NP); -} -#endif - -#ifdef PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP -void Mutex::ERRORCHECK_INITIALIZER(void* arg) { - init_with_kind((pthread_mutex_t*)arg, PTHREAD_MUTEX_ERRORCHECK); -} -#endif - -#ifdef PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP -void Mutex::RECURSIVE_INITIALIZER(void* arg) { - init_with_kind((pthread_mutex_t*)arg, PTHREAD_MUTEX_RECURSIVE_NP); -} -#endif - -/** - * Implementation of ReadWriteMutex class using POSIX rw lock - * - * @version $Id:$ - */ -class ReadWriteMutex::impl { -public: - impl() : initialized_(false) { -#ifdef THRIFT_PTHREAD_MUTEX_CONTENTION_PROFILING - profileTime_ = 0; -#endif - THROW_SRE_ONFAIL(pthread_rwlock_init(&rw_lock_, NULL)); - initialized_ = true; - } - - ~impl() { - if (initialized_) { - initialized_ = false; - ABORT_ONFAIL(pthread_rwlock_destroy(&rw_lock_)); - } - } - - void acquireRead() const { - PROFILE_MUTEX_START_LOCK(); - THROW_SRE_ONFAIL(pthread_rwlock_rdlock(&rw_lock_)); - PROFILE_MUTEX_NOT_LOCKED(); // not exclusive, so use not-locked path - } - - void acquireWrite() const { - PROFILE_MUTEX_START_LOCK(); - THROW_SRE_ONFAIL(pthread_rwlock_wrlock(&rw_lock_)); - PROFILE_MUTEX_LOCKED(); - } - - bool attemptRead() const { THROW_SRE_TRYFAIL(pthread_rwlock_tryrdlock(&rw_lock_)); } - - bool attemptWrite() const { THROW_SRE_TRYFAIL(pthread_rwlock_trywrlock(&rw_lock_)); } - - void release() const { - PROFILE_MUTEX_START_UNLOCK(); - THROW_SRE_ONFAIL(pthread_rwlock_unlock(&rw_lock_)); - PROFILE_MUTEX_UNLOCKED(); - } - -private: - mutable pthread_rwlock_t rw_lock_; - mutable bool initialized_; -#ifdef THRIFT_PTHREAD_MUTEX_CONTENTION_PROFILING - mutable int64_t profileTime_; -#endif -}; - -ReadWriteMutex::ReadWriteMutex() : impl_(new ReadWriteMutex::impl()) { -} - -void ReadWriteMutex::acquireRead() const { - impl_->acquireRead(); -} - -void ReadWriteMutex::acquireWrite() const { - impl_->acquireWrite(); -} - -bool ReadWriteMutex::attemptRead() const { - return impl_->attemptRead(); -} - -bool ReadWriteMutex::attemptWrite() const { - return impl_->attemptWrite(); -} - -void ReadWriteMutex::release() const { - impl_->release(); -} - -NoStarveReadWriteMutex::NoStarveReadWriteMutex() : writerWaiting_(false) { -} - -void NoStarveReadWriteMutex::acquireRead() const { - if (writerWaiting_) { - // writer is waiting, block on the writer's mutex until he's done with it - mutex_.lock(); - mutex_.unlock(); - } - - ReadWriteMutex::acquireRead(); -} - -void NoStarveReadWriteMutex::acquireWrite() const { - // if we can acquire the rwlock the easy way, we're done - if (attemptWrite()) { - return; - } - - // failed to get the rwlock, do it the hard way: - // locking the mutex and setting writerWaiting will cause all new readers to - // block on the mutex rather than on the rwlock. - mutex_.lock(); - writerWaiting_ = true; - ReadWriteMutex::acquireWrite(); - writerWaiting_ = false; - mutex_.unlock(); -} } } } // apache::thrift::concurrency diff --git a/lib/cpp/src/thrift/concurrency/Mutex.h b/lib/cpp/src/thrift/concurrency/Mutex.h index a1f539685..123ae785d 100644 --- a/lib/cpp/src/thrift/concurrency/Mutex.h +++ b/lib/cpp/src/thrift/concurrency/Mutex.h @@ -28,31 +28,6 @@ namespace apache { namespace thrift { namespace concurrency { -#ifndef THRIFT_NO_CONTENTION_PROFILING - -/** - * Determines if the Thrift Mutex and ReadWriteMutex classes will attempt to - * profile their blocking acquire methods. If this value is set to non-zero, - * Thrift will attempt to invoke the callback once every profilingSampleRate - * times. However, as the sampling is not synchronized the rate is not - * guranateed, and could be subject to big bursts and swings. Please ensure - * your sampling callback is as performant as your application requires. - * - * The callback will get called with the wait time taken to lock the mutex in - * usec and a (void*) that uniquely identifies the Mutex (or ReadWriteMutex) - * being locked. - * - * The enableMutexProfiling() function is unsynchronized; calling this function - * while profiling is already enabled may result in race conditions. On - * architectures where a pointer assignment is atomic, this is safe but there - * is no guarantee threads will agree on a single callback within any - * particular time period. - */ -typedef void (*MutexWaitCallback)(const void* id, int64_t waitTimeMicros); -void enableMutexProfiling(int32_t profilingSampleRate, MutexWaitCallback callback); - -#endif - /** * NOTE: All mutex implementations throw an exception on failure. See each * specific implementation to understand the exception type(s) used. @@ -65,9 +40,7 @@ void enableMutexProfiling(int32_t profilingSampleRate, MutexWaitCallback callbac */ class Mutex { public: - typedef void (*Initializer)(void*); - - Mutex(Initializer init = DEFAULT_INITIALIZER); + Mutex(); virtual ~Mutex() {} virtual void lock() const; @@ -77,57 +50,11 @@ public: void* getUnderlyingImpl() const; - // If you attempt to use one of these and it fails to link, it means - // your version of pthreads does not support it - try another one. - static void ADAPTIVE_INITIALIZER(void*); - static void DEFAULT_INITIALIZER(void*); - static void ERRORCHECK_INITIALIZER(void*); - static void RECURSIVE_INITIALIZER(void*); - -private: - class impl; - std::shared_ptr impl_; -}; - -class ReadWriteMutex { -public: - ReadWriteMutex(); - virtual ~ReadWriteMutex() {} - - // these get the lock and block until it is done successfully - virtual void acquireRead() const; - virtual void acquireWrite() const; - - // these attempt to get the lock, returning false immediately if they fail - virtual bool attemptRead() const; - virtual bool attemptWrite() const; - - // this releases both read and write locks - virtual void release() const; - private: class impl; std::shared_ptr impl_; }; -/** - * A ReadWriteMutex that guarantees writers will not be starved by readers: - * When a writer attempts to acquire the mutex, all new readers will be - * blocked from acquiring the mutex until the writer has acquired and - * released it. In some operating systems, this may already be guaranteed - * by a regular ReadWriteMutex. - */ -class NoStarveReadWriteMutex : public ReadWriteMutex { -public: - NoStarveReadWriteMutex(); - - virtual void acquireRead() const; - virtual void acquireWrite() const; - -private: - Mutex mutex_; - mutable volatile bool writerWaiting_; -}; class Guard : boost::noncopyable { public: @@ -156,32 +83,6 @@ private: const Mutex* mutex_; }; -// Can be used as second argument to RWGuard to make code more readable -// as to whether we're doing acquireRead() or acquireWrite(). -enum RWGuardType { RW_READ = 0, RW_WRITE = 1 }; - -class RWGuard : boost::noncopyable { -public: - RWGuard(const ReadWriteMutex& value, bool write = false) : rw_mutex_(value) { - if (write) { - rw_mutex_.acquireWrite(); - } else { - rw_mutex_.acquireRead(); - } - } - - RWGuard(const ReadWriteMutex& value, RWGuardType type) : rw_mutex_(value) { - if (type == RW_WRITE) { - rw_mutex_.acquireWrite(); - } else { - rw_mutex_.acquireRead(); - } - } - ~RWGuard() { rw_mutex_.release(); } - -private: - const ReadWriteMutex& rw_mutex_; -}; } } } // apache::thrift::concurrency diff --git a/lib/cpp/src/thrift/concurrency/PlatformThreadFactory.h b/lib/cpp/src/thrift/concurrency/PlatformThreadFactory.h deleted file mode 100644 index 99b44033b..000000000 --- a/lib/cpp/src/thrift/concurrency/PlatformThreadFactory.h +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#ifndef _THRIFT_CONCURRENCY_PLATFORMTHREADFACTORY_H_ -#define _THRIFT_CONCURRENCY_PLATFORMTHREADFACTORY_H_ 1 - -// clang-format off -#include -#if USE_STD_THREAD -# include -#else -# include -#endif -// clang-format on - -namespace apache { -namespace thrift { -namespace concurrency { - -// clang-format off -#if USE_STD_THREAD - typedef StdThreadFactory PlatformThreadFactory; -#else - typedef PosixThreadFactory PlatformThreadFactory; -#endif -// clang-format on - -} -} -} // apache::thrift::concurrency - -#endif // #ifndef _THRIFT_CONCURRENCY_PLATFORMTHREADFACTORY_H_ diff --git a/lib/cpp/src/thrift/concurrency/PosixThreadFactory.cpp b/lib/cpp/src/thrift/concurrency/PosixThreadFactory.cpp deleted file mode 100644 index 5c5926977..000000000 --- a/lib/cpp/src/thrift/concurrency/PosixThreadFactory.cpp +++ /dev/null @@ -1,335 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include - -#include -#include -#include - -#if GOOGLE_PERFTOOLS_REGISTER_THREAD -#include -#endif - -#include -#include - -#include - -#include - -namespace apache { -namespace thrift { -namespace concurrency { - -/** - * The POSIX thread class. - * - * @version $Id:$ - */ -class PthreadThread : public Thread { -public: - enum STATE { uninitialized, starting, started, stopping, stopped }; - - static const int MB = 1024 * 1024; - - static void* threadMain(void* arg); - -private: - pthread_t pthread_; - Monitor monitor_; // guard to protect state_ and also notification - STATE state_; // to protect proper thread start behavior - int policy_; - int priority_; - int stackSize_; - std::weak_ptr self_; - bool detached_; - -public: - PthreadThread(int policy, - int priority, - int stackSize, - bool detached, - std::shared_ptr runnable) - : - -#ifndef _WIN32 - pthread_(0), -#endif // _WIN32 - state_(uninitialized), - policy_(policy), - priority_(priority), - stackSize_(stackSize), - detached_(detached) { - - this->Thread::runnable(runnable); - } - - ~PthreadThread() { - /* Nothing references this thread, if is is not detached, do a join - now, otherwise the thread-id and, possibly, other resources will - be leaked. */ - if (!detached_) { - try { - join(); - } catch (...) { - // We're really hosed. - } - } - } - - STATE getState() const - { - Synchronized sync(monitor_); - return state_; - } - - void setState(STATE newState) - { - Synchronized sync(monitor_); - state_ = newState; - - // unblock start() with the knowledge that the thread has actually - // started running, which avoids a race in detached threads. - if (newState == started) { - monitor_.notify(); - } - } - - void start() { - if (getState() != uninitialized) { - return; - } - - pthread_attr_t thread_attr; - if (pthread_attr_init(&thread_attr) != 0) { - throw SystemResourceException("pthread_attr_init failed"); - } - - if (pthread_attr_setdetachstate(&thread_attr, - detached_ ? PTHREAD_CREATE_DETACHED : PTHREAD_CREATE_JOINABLE) - != 0) { - throw SystemResourceException("pthread_attr_setdetachstate failed"); - } - - // Set thread stack size - if (pthread_attr_setstacksize(&thread_attr, MB * stackSize_) != 0) { - throw SystemResourceException("pthread_attr_setstacksize failed"); - } - -// Set thread policy -#ifdef _WIN32 - // WIN32 Pthread implementation doesn't seem to support sheduling policies other then - // PosixThreadFactory::OTHER - runtime error - policy_ = PosixThreadFactory::OTHER; -#endif - -#if _POSIX_THREAD_PRIORITY_SCHEDULING > 0 - if (pthread_attr_setschedpolicy(&thread_attr, policy_) != 0) { - throw SystemResourceException("pthread_attr_setschedpolicy failed"); - } -#endif - - struct sched_param sched_param; - sched_param.sched_priority = priority_; - - // Set thread priority - if (pthread_attr_setschedparam(&thread_attr, &sched_param) != 0) { - throw SystemResourceException("pthread_attr_setschedparam failed"); - } - - // Create reference - std::shared_ptr* selfRef = new std::shared_ptr(); - *selfRef = self_.lock(); - - setState(starting); - - Synchronized sync(monitor_); - - if (pthread_create(&pthread_, &thread_attr, threadMain, (void*)selfRef) != 0) { - throw SystemResourceException("pthread_create failed"); - } - - // The caller may not choose to guarantee the scope of the Runnable - // being used in the thread, so we must actually wait until the thread - // starts before we return. If we do not wait, it would be possible - // for the caller to start destructing the Runnable and the Thread, - // and we would end up in a race. This was identified with valgrind. - monitor_.wait(); - } - - void join() { - if (!detached_ && getState() != uninitialized) { - void* ignore; - /* XXX - If join fails it is most likely due to the fact - that the last reference was the thread itself and cannot - join. This results in leaked threads and will eventually - cause the process to run out of thread resources. - We're beyond the point of throwing an exception. Not clear how - best to handle this. */ - int res = pthread_join(pthread_, &ignore); - detached_ = (res == 0); - if (res != 0) { - GlobalOutput.printf("PthreadThread::join(): fail with code %d", res); - } - } - } - - Thread::id_t getId() { - -#ifndef _WIN32 - return (Thread::id_t)pthread_; -#else - return (Thread::id_t)pthread_.p; -#endif // _WIN32 - } - - std::shared_ptr runnable() const { return Thread::runnable(); } - - void runnable(std::shared_ptr value) { Thread::runnable(value); } - - void weakRef(std::shared_ptr self) { - assert(self.get() == this); - self_ = std::weak_ptr(self); - } -}; - -void* PthreadThread::threadMain(void* arg) { - std::shared_ptr thread = *(std::shared_ptr*)arg; - delete reinterpret_cast*>(arg); - -#if GOOGLE_PERFTOOLS_REGISTER_THREAD - ProfilerRegisterThread(); -#endif - - thread->setState(started); - - thread->runnable()->run(); - - STATE _s = thread->getState(); - if (_s != stopping && _s != stopped) { - thread->setState(stopping); - } - - return (void*)0; -} - -/** - * Converts generic posix thread schedule policy enums into pthread - * API values. - */ -static int toPthreadPolicy(PosixThreadFactory::POLICY policy) { - switch (policy) { - case PosixThreadFactory::OTHER: - return SCHED_OTHER; - case PosixThreadFactory::FIFO: - return SCHED_FIFO; - case PosixThreadFactory::ROUND_ROBIN: - return SCHED_RR; - } - return SCHED_OTHER; -} - -/** - * Converts relative thread priorities to absolute value based on posix - * thread scheduler policy - * - * The idea is simply to divide up the priority range for the given policy - * into the correpsonding relative priority level (lowest..highest) and - * then pro-rate accordingly. - */ -static int toPthreadPriority(PosixThreadFactory::POLICY policy, PosixThreadFactory::PRIORITY priority) { - int pthread_policy = toPthreadPolicy(policy); - int min_priority = 0; - int max_priority = 0; -#ifdef HAVE_SCHED_GET_PRIORITY_MIN - min_priority = sched_get_priority_min(pthread_policy); -#endif -#ifdef HAVE_SCHED_GET_PRIORITY_MAX - max_priority = sched_get_priority_max(pthread_policy); -#endif - int quanta = (PosixThreadFactory::HIGHEST - PosixThreadFactory::LOWEST) + 1; - float stepsperquanta = (float)(max_priority - min_priority) / quanta; - - if (priority <= PosixThreadFactory::HIGHEST) { - return (int)(min_priority + stepsperquanta * priority); - } else { - // should never get here for priority increments. - assert(false); - return (int)(min_priority + stepsperquanta * PosixThreadFactory::NORMAL); - } -} - -PosixThreadFactory::PosixThreadFactory(POLICY policy, - PRIORITY priority, - int stackSize, - bool detached) - : ThreadFactory(detached), - policy_(policy), - priority_(priority), - stackSize_(stackSize) { -} - -PosixThreadFactory::PosixThreadFactory(bool detached) - : ThreadFactory(detached), - policy_(ROUND_ROBIN), - priority_(NORMAL), - stackSize_(1) { -} - -std::shared_ptr PosixThreadFactory::newThread(std::shared_ptr runnable) const { - std::shared_ptr result - = std::shared_ptr(new PthreadThread(toPthreadPolicy(policy_), - toPthreadPriority(policy_, priority_), - stackSize_, - isDetached(), - runnable)); - result->weakRef(result); - runnable->thread(result); - return result; -} - -int PosixThreadFactory::getStackSize() const { - return stackSize_; -} - -void PosixThreadFactory::setStackSize(int value) { - stackSize_ = value; -} - -PosixThreadFactory::PRIORITY PosixThreadFactory::getPriority() const { - return priority_; -} - -void PosixThreadFactory::setPriority(PRIORITY value) { - priority_ = value; -} - -Thread::id_t PosixThreadFactory::getCurrentThreadId() const { -#ifndef _WIN32 - return (Thread::id_t)pthread_self(); -#else - return (Thread::id_t)pthread_self().p; -#endif // _WIN32 -} - -} -} -} // apache::thrift::concurrency diff --git a/lib/cpp/src/thrift/concurrency/PosixThreadFactory.h b/lib/cpp/src/thrift/concurrency/PosixThreadFactory.h deleted file mode 100644 index cb3b17c9c..000000000 --- a/lib/cpp/src/thrift/concurrency/PosixThreadFactory.h +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#ifndef _THRIFT_CONCURRENCY_POSIXTHREADFACTORY_H_ -#define _THRIFT_CONCURRENCY_POSIXTHREADFACTORY_H_ 1 - -#include - -#include - -namespace apache { -namespace thrift { -namespace concurrency { - -/** - * A thread factory to create posix threads - * - * @version $Id:$ - */ -class PosixThreadFactory : public ThreadFactory { - -public: - /** - * POSIX Thread scheduler policies - */ - enum POLICY { OTHER, FIFO, ROUND_ROBIN }; - - /** - * POSIX Thread scheduler relative priorities, - * - * Absolute priority is determined by scheduler policy and OS. This - * enumeration specifies relative priorities such that one can specify a - * priority within a giving scheduler policy without knowing the absolute - * value of the priority. - */ - enum PRIORITY { - LOWEST = 0, - LOWER = 1, - LOW = 2, - NORMAL = 3, - HIGH = 4, - HIGHER = 5, - HIGHEST = 6, - INCREMENT = 7, - DECREMENT = 8 - }; - - /** - * Posix thread (pthread) factory. All threads created by a factory are reference-counted - * via std::shared_ptr. The factory guarantees that threads and the Runnable tasks - * they host will be properly cleaned up once the last strong reference to both is - * given up. - * - * Threads are created with the specified policy, priority, stack-size and detachable-mode - * detached means the thread is free-running and will release all system resources the - * when it completes. A detachable thread is not joinable. The join method - * of a detachable thread will return immediately with no error. - * - * By default threads are not joinable. - */ - PosixThreadFactory(POLICY policy = ROUND_ROBIN, - PRIORITY priority = NORMAL, - int stackSize = 1, - bool detached = true); - - /** - * Provide a constructor compatible with the other factories - * The default policy is POLICY::ROUND_ROBIN. - * The default priority is PRIORITY::NORMAL. - * The default stackSize is 1. - */ - PosixThreadFactory(bool detached); - - // From ThreadFactory; - std::shared_ptr newThread(std::shared_ptr runnable) const; - - // From ThreadFactory; - Thread::id_t getCurrentThreadId() const; - - /** - * Gets stack size for newly created threads - * - * @return int size in megabytes - */ - virtual int getStackSize() const; - - /** - * Sets stack size for newly created threads - * - * @param value size in megabytes - */ - virtual void setStackSize(int value); - - /** - * Gets priority relative to current policy - */ - virtual PRIORITY getPriority() const; - - /** - * Sets priority relative to current policy - */ - virtual void setPriority(PRIORITY priority); - -private: - POLICY policy_; - PRIORITY priority_; - int stackSize_; -}; -} -} -} // apache::thrift::concurrency - -#endif // #ifndef _THRIFT_CONCURRENCY_POSIXTHREADFACTORY_H_ diff --git a/lib/cpp/src/thrift/concurrency/StdMonitor.cpp b/lib/cpp/src/thrift/concurrency/StdMonitor.cpp deleted file mode 100644 index 7b3b209a7..000000000 --- a/lib/cpp/src/thrift/concurrency/StdMonitor.cpp +++ /dev/null @@ -1,213 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include - -#include -#include -#include -#include -#include - -#include -#include -#include -#include - -namespace apache { -namespace thrift { -namespace concurrency { - -/** - * Monitor implementation using the std thread library - * - * @version $Id:$ - */ -class Monitor::Impl { - -public: - Impl() : ownedMutex_(new Mutex()), conditionVariable_(), mutex_(NULL) { init(ownedMutex_.get()); } - - Impl(Mutex* mutex) : ownedMutex_(), conditionVariable_(), mutex_(NULL) { init(mutex); } - - Impl(Monitor* monitor) : ownedMutex_(), conditionVariable_(), mutex_(NULL) { - init(&(monitor->mutex())); - } - - Mutex& mutex() { return *mutex_; } - void lock() { mutex_->lock(); } - void unlock() { mutex_->unlock(); } - - /** - * Exception-throwing version of waitForTimeRelative(), called simply - * wait(int64) for historical reasons. Timeout is in milliseconds. - * - * If the condition occurs, this function returns cleanly; on timeout or - * error an exception is thrown. - */ - void wait(int64_t timeout_ms) { - int result = waitForTimeRelative(timeout_ms); - if (result == THRIFT_ETIMEDOUT) { - throw TimedOutException(); - } else if (result != 0) { - throw TException("Monitor::wait() failed"); - } - } - - /** - * Waits until the specified timeout in milliseconds for the condition to - * occur, or waits forever if timeout_ms == 0. - * - * Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code. - */ - int waitForTimeRelative(int64_t timeout_ms) { - if (timeout_ms == 0LL) { - return waitForever(); - } - - assert(mutex_); - std::timed_mutex* mutexImpl = static_cast(mutex_->getUnderlyingImpl()); - assert(mutexImpl); - - std::unique_lock lock(*mutexImpl, std::adopt_lock); - bool timedout = (conditionVariable_.wait_for(lock, std::chrono::milliseconds(timeout_ms)) - == std::cv_status::timeout); - lock.release(); - return (timedout ? THRIFT_ETIMEDOUT : 0); - } - - /** - * Waits until the absolute time specified using struct THRIFT_TIMESPEC. - * Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code. - */ - int waitForTime(const THRIFT_TIMESPEC* abstime) { - struct timeval temp; - temp.tv_sec = static_cast(abstime->tv_sec); - temp.tv_usec = static_cast(abstime->tv_nsec) / 1000; - return waitForTime(&temp); - } - - /** - * Waits until the absolute time specified using struct timeval. - * Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code. - */ - int waitForTime(const struct timeval* abstime) { - assert(mutex_); - std::timed_mutex* mutexImpl = static_cast(mutex_->getUnderlyingImpl()); - assert(mutexImpl); - - struct timeval currenttime; - Util::toTimeval(currenttime, Util::currentTime()); - - long tv_sec = static_cast(abstime->tv_sec - currenttime.tv_sec); - long tv_usec = static_cast(abstime->tv_usec - currenttime.tv_usec); - if (tv_sec < 0) - tv_sec = 0; - if (tv_usec < 0) - tv_usec = 0; - - std::unique_lock lock(*mutexImpl, std::adopt_lock); - bool timedout = (conditionVariable_.wait_for(lock, - std::chrono::seconds(tv_sec) - + std::chrono::microseconds(tv_usec)) - == std::cv_status::timeout); - lock.release(); - return (timedout ? THRIFT_ETIMEDOUT : 0); - } - - /** - * Waits forever until the condition occurs. - * Returns 0 if condition occurs, or an error code otherwise. - */ - int waitForever() { - assert(mutex_); - std::timed_mutex* mutexImpl = static_cast(mutex_->getUnderlyingImpl()); - assert(mutexImpl); - - std::unique_lock lock(*mutexImpl, std::adopt_lock); - conditionVariable_.wait(lock); - lock.release(); - return 0; - } - - void notify() { conditionVariable_.notify_one(); } - - void notifyAll() { conditionVariable_.notify_all(); } - -private: - void init(Mutex* mutex) { mutex_ = mutex; } - - const std::unique_ptr ownedMutex_; - std::condition_variable_any conditionVariable_; - Mutex* mutex_; -}; - -Monitor::Monitor() : impl_(new Monitor::Impl()) { -} -Monitor::Monitor(Mutex* mutex) : impl_(new Monitor::Impl(mutex)) { -} -Monitor::Monitor(Monitor* monitor) : impl_(new Monitor::Impl(monitor)) { -} - -Monitor::~Monitor() { - delete impl_; -} - -Mutex& Monitor::mutex() const { - return const_cast(impl_)->mutex(); -} - -void Monitor::lock() const { - const_cast(impl_)->lock(); -} - -void Monitor::unlock() const { - const_cast(impl_)->unlock(); -} - -void Monitor::wait(int64_t timeout) const { - const_cast(impl_)->wait(timeout); -} - -int Monitor::waitForTime(const THRIFT_TIMESPEC* abstime) const { - return const_cast(impl_)->waitForTime(abstime); -} - -int Monitor::waitForTime(const timeval* abstime) const { - return const_cast(impl_)->waitForTime(abstime); -} - -int Monitor::waitForTimeRelative(int64_t timeout_ms) const { - return const_cast(impl_)->waitForTimeRelative(timeout_ms); -} - -int Monitor::waitForever() const { - return const_cast(impl_)->waitForever(); -} - -void Monitor::notify() const { - const_cast(impl_)->notify(); -} - -void Monitor::notifyAll() const { - const_cast(impl_)->notifyAll(); -} -} -} -} // apache::thrift::concurrency diff --git a/lib/cpp/src/thrift/concurrency/StdMutex.cpp b/lib/cpp/src/thrift/concurrency/StdMutex.cpp deleted file mode 100644 index e0f79fa37..000000000 --- a/lib/cpp/src/thrift/concurrency/StdMutex.cpp +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include - -#include -#include - -#include -#include -#include - -namespace apache { -namespace thrift { -namespace concurrency { - -/** - * Implementation of Mutex class using C++11 std::timed_mutex - * - * Methods throw std::system_error on error. - * - * @version $Id:$ - */ -class Mutex::impl : public std::timed_mutex {}; - -Mutex::Mutex(Initializer init) : impl_(new Mutex::impl()) { - ((void)init); -} - -void* Mutex::getUnderlyingImpl() const { - return impl_.get(); -} - -void Mutex::lock() const { - impl_->lock(); -} - -bool Mutex::trylock() const { - return impl_->try_lock(); -} - -bool Mutex::timedlock(int64_t ms) const { - return impl_->try_lock_for(std::chrono::milliseconds(ms)); -} - -void Mutex::unlock() const { - impl_->unlock(); -} - -void Mutex::DEFAULT_INITIALIZER(void* arg) { - ((void)arg); -} -} -} -} // apache::thrift::concurrency diff --git a/lib/cpp/src/thrift/concurrency/StdThreadFactory.cpp b/lib/cpp/src/thrift/concurrency/StdThreadFactory.cpp deleted file mode 100644 index c885f3aca..000000000 --- a/lib/cpp/src/thrift/concurrency/StdThreadFactory.cpp +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include - -#if USE_STD_THREAD - -#include -#include -#include -#include - -#include -#include - -namespace apache { -namespace thrift { -namespace concurrency { - -/** - * The C++11 thread class. - * - * Note that we use boost shared_ptr rather than std shared_ptrs here - * because the Thread/Runnable classes use those and we don't want to - * mix them. - * - * @version $Id:$ - */ -class StdThread : public Thread, public std::enable_shared_from_this { -public: - enum STATE { uninitialized, starting, started, stopping, stopped }; - - static void threadMain(std::shared_ptr thread); - -private: - std::unique_ptr thread_; - Monitor monitor_; - STATE state_; - bool detached_; - -public: - StdThread(bool detached, std::shared_ptr runnable) - : state_(uninitialized), detached_(detached) { - this->Thread::runnable(runnable); - } - - ~StdThread() { - if (!detached_ && thread_->joinable()) { - try { - join(); - } catch (...) { - // We're really hosed. - } - } - } - - STATE getState() const - { - Synchronized sync(monitor_); - return state_; - } - - void setState(STATE newState) - { - Synchronized sync(monitor_); - state_ = newState; - - // unblock start() with the knowledge that the thread has actually - // started running, which avoids a race in detached threads. - if (newState == started) { - monitor_.notify(); - } - } - - void start() { - if (getState() != uninitialized) { - return; - } - - std::shared_ptr selfRef = shared_from_this(); - setState(starting); - - Synchronized sync(monitor_); - thread_ = std::unique_ptr(new std::thread(threadMain, selfRef)); - - if (detached_) - thread_->detach(); - - // Wait for the thread to start and get far enough to grab everything - // that it needs from the calling context, thus absolving the caller - // from being required to hold on to runnable indefinitely. - monitor_.wait(); - } - - void join() { - if (!detached_ && state_ != uninitialized) { - thread_->join(); - } - } - - Thread::id_t getId() { return thread_.get() ? thread_->get_id() : std::thread::id(); } - - std::shared_ptr runnable() const { return Thread::runnable(); } - - void runnable(std::shared_ptr value) { Thread::runnable(value); } -}; - -void StdThread::threadMain(std::shared_ptr thread) { -#if GOOGLE_PERFTOOLS_REGISTER_THREAD - ProfilerRegisterThread(); -#endif - - thread->setState(started); - thread->runnable()->run(); - - if (thread->getState() != stopping && thread->getState() != stopped) { - thread->setState(stopping); - } -} - -StdThreadFactory::StdThreadFactory(bool detached) : ThreadFactory(detached) { -} - -std::shared_ptr StdThreadFactory::newThread(std::shared_ptr runnable) const { - std::shared_ptr result = std::shared_ptr(new StdThread(isDetached(), runnable)); - runnable->thread(result); - return result; -} - -Thread::id_t StdThreadFactory::getCurrentThreadId() const { - return std::this_thread::get_id(); -} -} -} -} // apache::thrift::concurrency - -#endif // USE_STD_THREAD diff --git a/lib/cpp/src/thrift/concurrency/StdThreadFactory.h b/lib/cpp/src/thrift/concurrency/StdThreadFactory.h deleted file mode 100644 index e74046b7b..000000000 --- a/lib/cpp/src/thrift/concurrency/StdThreadFactory.h +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#ifndef _THRIFT_CONCURRENCY_STDTHREADFACTORY_H_ -#define _THRIFT_CONCURRENCY_STDTHREADFACTORY_H_ 1 - -#include - -#include - -namespace apache { -namespace thrift { -namespace concurrency { - -/** - * A thread factory to create std::threads. - * - * @version $Id:$ - */ -class StdThreadFactory : public ThreadFactory { - -public: - /** - * Std thread factory. All threads created by a factory are reference-counted - * via std::shared_ptr. The factory guarantees that threads and the Runnable tasks - * they host will be properly cleaned up once the last strong reference - * to both is given up. - * - * By default threads are not joinable. - */ - - StdThreadFactory(bool detached = true); - - // From ThreadFactory; - std::shared_ptr newThread(std::shared_ptr runnable) const; - - // From ThreadFactory; - Thread::id_t getCurrentThreadId() const; -}; - -} -} -} // apache::thrift::concurrency - -#endif // #ifndef _THRIFT_CONCURRENCY_STDTHREADFACTORY_H_ diff --git a/lib/cpp/src/thrift/concurrency/Thread.cpp b/lib/cpp/src/thrift/concurrency/Thread.cpp new file mode 100644 index 000000000..a2bb1270f --- /dev/null +++ b/lib/cpp/src/thrift/concurrency/Thread.cpp @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include + +namespace apache { +namespace thrift { +namespace concurrency { + +void Thread::threadMain(std::shared_ptr thread) { + thread->setState(started); + thread->runnable()->run(); + + if (thread->getState() != stopping && thread->getState() != stopped) { + thread->setState(stopping); + } +} + +} +} +} // apache::thrift::concurrency diff --git a/lib/cpp/src/thrift/concurrency/Thread.h b/lib/cpp/src/thrift/concurrency/Thread.h index b2ea4e297..729d11a48 100644 --- a/lib/cpp/src/thrift/concurrency/Thread.h +++ b/lib/cpp/src/thrift/concurrency/Thread.h @@ -22,16 +22,10 @@ #include #include +#include #include - -#if USE_STD_THREAD -#include -#else -#ifdef HAVE_PTHREAD_H -#include -#endif -#endif +#include namespace apache { namespace thrift { @@ -75,94 +69,106 @@ private: * * @see apache::thrift::concurrency::ThreadFactory) */ -class Thread { +class Thread final : public std::enable_shared_from_this { public: -#if USE_STD_THREAD typedef std::thread::id id_t; + enum STATE { uninitialized, starting, started, stopping, stopped }; + + static void threadMain(std::shared_ptr thread); + static inline bool is_current(id_t t) { return t == std::this_thread::get_id(); } static inline id_t get_current() { return std::this_thread::get_id(); } -#else - typedef pthread_t id_t; - - static inline bool is_current(id_t t) { return pthread_equal(pthread_self(), t); } - static inline id_t get_current() { return pthread_self(); } -#endif - virtual ~Thread(){}; + Thread(bool detached, std::shared_ptr runnable) + : state_(uninitialized), detached_(detached) { + this->_runnable = runnable; + } + + ~Thread() { + if (!detached_ && thread_->joinable()) { + try { + join(); + } catch (...) { + // We're really hosed. + } + } + } + + STATE getState() const + { + Synchronized sync(monitor_); + return state_; + } + + void setState(STATE newState) + { + Synchronized sync(monitor_); + state_ = newState; + + // unblock start() with the knowledge that the thread has actually + // started running, which avoids a race in detached threads. + if (newState == started) { + monitor_.notify(); + } + } /** * Starts the thread. Does platform specific thread creation and * configuration then invokes the run method of the Runnable object bound * to this thread. */ - virtual void start() = 0; + void start() { + if (getState() != uninitialized) { + return; + } + + std::shared_ptr selfRef = shared_from_this(); + setState(starting); + + Synchronized sync(monitor_); + thread_ = std::unique_ptr(new std::thread(threadMain, selfRef)); + + if (detached_) + thread_->detach(); + + // Wait for the thread to start and get far enough to grab everything + // that it needs from the calling context, thus absolving the caller + // from being required to hold on to runnable indefinitely. + monitor_.wait(); + } /** * Join this thread. If this thread is joinable, the calling thread blocks * until this thread completes. If the target thread is not joinable, then * nothing happens. */ - virtual void join() = 0; + void join() { + if (!detached_ && state_ != uninitialized) { + thread_->join(); + } + } /** * Gets the thread's platform-specific ID */ - virtual id_t getId() = 0; + Thread::id_t getId() { return thread_.get() ? thread_->get_id() : std::thread::id(); } /** * Gets the runnable object this thread is hosting */ - virtual std::shared_ptr runnable() const { return _runnable; } - -protected: - virtual void runnable(std::shared_ptr value) { _runnable = value; } + std::shared_ptr runnable() const { return _runnable; } private: std::shared_ptr _runnable; -}; - -/** - * Factory to create platform-specific thread object and bind them to Runnable - * object for execution - */ -class ThreadFactory { -protected: - ThreadFactory(bool detached) : detached_(detached) { } - -public: - virtual ~ThreadFactory() { } - - /** - * Gets current detached mode - */ - bool isDetached() const { return detached_; } - - /** - * Sets the detached disposition of newly created threads. - */ - void setDetached(bool detached) { detached_ = detached; } - - /** - * Create a new thread. - */ - virtual std::shared_ptr newThread(std::shared_ptr runnable) const = 0; - - /** - * Gets the current thread id or unknown_thread_id if the current thread is not a thrift thread - */ - virtual Thread::id_t getCurrentThreadId() const = 0; - - /** - * For code readability define the unknown/undefined thread id - */ - static const Thread::id_t unknown_thread_id; - -private: + std::unique_ptr thread_; + Monitor monitor_; + STATE state_; bool detached_; }; + } } } // apache::thrift::concurrency diff --git a/lib/cpp/src/thrift/concurrency/ThreadFactory.cpp b/lib/cpp/src/thrift/concurrency/ThreadFactory.cpp new file mode 100644 index 000000000..941b99371 --- /dev/null +++ b/lib/cpp/src/thrift/concurrency/ThreadFactory.cpp @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include + +#include +#include + +namespace apache { +namespace thrift { +namespace concurrency { + +std::shared_ptr ThreadFactory::newThread(std::shared_ptr runnable) const { + std::shared_ptr result = std::shared_ptr(new Thread(isDetached(), runnable)); + runnable->thread(result); + return result; +} + +Thread::id_t ThreadFactory::getCurrentThreadId() const { + return std::this_thread::get_id(); +} +} +} +} // apache::thrift::concurrency diff --git a/lib/cpp/src/thrift/concurrency/ThreadFactory.h b/lib/cpp/src/thrift/concurrency/ThreadFactory.h new file mode 100644 index 000000000..f317afcde --- /dev/null +++ b/lib/cpp/src/thrift/concurrency/ThreadFactory.h @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef _THRIFT_CONCURRENCY_THREADFACTORY_H_ +#define _THRIFT_CONCURRENCY_THREADFACTORY_H_ 1 + +#include + +#include +namespace apache { +namespace thrift { +namespace concurrency { + +/** + * Factory to create thread object and bind them to Runnable + * object for execution + */ +class ThreadFactory final { +public: + /** + * All threads created by a factory are reference-counted + * via std::shared_ptr. The factory guarantees that threads and the Runnable tasks + * they host will be properly cleaned up once the last strong reference + * to both is given up. + * + * By default threads are not joinable. + */ + ThreadFactory(bool detached = true) : detached_(detached) { } + + ~ThreadFactory() = default; + + /** + * Gets current detached mode + */ + bool isDetached() const { return detached_; } + + /** + * Sets the detached disposition of newly created threads. + */ + void setDetached(bool detached) { detached_ = detached; } + + /** + * Create a new thread. + */ + std::shared_ptr newThread(std::shared_ptr runnable) const; + + /** + * Gets the current thread id or unknown_thread_id if the current thread is not a thrift thread + */ + Thread::id_t getCurrentThreadId() const; + + /** + * For code readability define the unknown/undefined thread id + */ + static const Thread::id_t unknown_thread_id; + +private: + bool detached_; +}; + +} +} +} // apache::thrift::concurrency + +#endif // #ifndef _THRIFT_CONCURRENCY_THREADFACTORY_H_ diff --git a/lib/cpp/src/thrift/concurrency/ThreadManager.h b/lib/cpp/src/thrift/concurrency/ThreadManager.h index 470fc0aae..4b4b3d491 100644 --- a/lib/cpp/src/thrift/concurrency/ThreadManager.h +++ b/lib/cpp/src/thrift/concurrency/ThreadManager.h @@ -23,7 +23,7 @@ #include #include #include -#include +#include namespace apache { namespace thrift { diff --git a/lib/cpp/src/thrift/concurrency/TimerManager.h b/lib/cpp/src/thrift/concurrency/TimerManager.h index ba792264c..4d73b002d 100644 --- a/lib/cpp/src/thrift/concurrency/TimerManager.h +++ b/lib/cpp/src/thrift/concurrency/TimerManager.h @@ -22,7 +22,7 @@ #include #include -#include +#include #include #include diff --git a/lib/cpp/src/thrift/server/TNonblockingServer.cpp b/lib/cpp/src/thrift/server/TNonblockingServer.cpp index f16fce789..31ff2a96a 100644 --- a/lib/cpp/src/thrift/server/TNonblockingServer.cpp +++ b/lib/cpp/src/thrift/server/TNonblockingServer.cpp @@ -22,7 +22,7 @@ #include #include #include -#include +#include #include #include @@ -1118,12 +1118,7 @@ void TNonblockingServer::registerEvents(event_base* user_event_base) { // Launch all the secondary IO threads in separate threads if (ioThreads_.size() > 1) { - ioThreadFactory_.reset(new PlatformThreadFactory( -#if !USE_STD_THREAD - PlatformThreadFactory::OTHER, // scheduler - PlatformThreadFactory::NORMAL, // priority - 1, // stack size (MB) -#endif + ioThreadFactory_.reset(new ThreadFactory( false // detached )); diff --git a/lib/cpp/src/thrift/server/TNonblockingServer.h b/lib/cpp/src/thrift/server/TNonblockingServer.h index e79c24f62..2c2389c3c 100644 --- a/lib/cpp/src/thrift/server/TNonblockingServer.h +++ b/lib/cpp/src/thrift/server/TNonblockingServer.h @@ -30,7 +30,7 @@ #include #include #include -#include +#include #include #include #include @@ -53,7 +53,7 @@ using apache::thrift::transport::TNonblockingServerTransport; using apache::thrift::protocol::TProtocol; using apache::thrift::concurrency::Runnable; using apache::thrift::concurrency::ThreadManager; -using apache::thrift::concurrency::PlatformThreadFactory; +using apache::thrift::concurrency::ThreadFactory; using apache::thrift::concurrency::ThreadFactory; using apache::thrift::concurrency::Thread; using apache::thrift::concurrency::Mutex; @@ -166,7 +166,7 @@ private: bool threadPoolProcessing_; // Factory to create the IO threads - std::shared_ptr ioThreadFactory_; + std::shared_ptr ioThreadFactory_; // Vector of IOThread objects that will handle our IO std::vector > ioThreads_; @@ -386,9 +386,7 @@ public: /** * Sets the number of IO threads used by this server. Can only be used before - * the call to serve() and has no effect afterwards. We always use a - * PosixThreadFactory for the IO worker threads, because they must joinable - * for clean shutdown. + * the call to serve() and has no effect afterwards. */ void setNumIOThreads(size_t numThreads) { numIOThreads_ = numThreads; diff --git a/lib/cpp/src/thrift/server/TThreadedServer.cpp b/lib/cpp/src/thrift/server/TThreadedServer.cpp index 2264df79b..ed2d80d00 100644 --- a/lib/cpp/src/thrift/server/TThreadedServer.cpp +++ b/lib/cpp/src/thrift/server/TThreadedServer.cpp @@ -19,7 +19,7 @@ #include #include -#include +#include #include namespace apache { diff --git a/lib/cpp/src/thrift/server/TThreadedServer.h b/lib/cpp/src/thrift/server/TThreadedServer.h index c5ccd03c8..9fc9d1125 100644 --- a/lib/cpp/src/thrift/server/TThreadedServer.h +++ b/lib/cpp/src/thrift/server/TThreadedServer.h @@ -22,7 +22,7 @@ #include #include -#include +#include #include #include @@ -44,7 +44,7 @@ public: const std::shared_ptr& protocolFactory, const std::shared_ptr& threadFactory = std::shared_ptr( - new apache::thrift::concurrency::PlatformThreadFactory(false))); + new apache::thrift::concurrency::ThreadFactory(false))); TThreadedServer( const std::shared_ptr& processor, @@ -53,7 +53,7 @@ public: const std::shared_ptr& protocolFactory, const std::shared_ptr& threadFactory = std::shared_ptr( - new apache::thrift::concurrency::PlatformThreadFactory(false))); + new apache::thrift::concurrency::ThreadFactory(false))); TThreadedServer( const std::shared_ptr& processorFactory, @@ -64,7 +64,7 @@ public: const std::shared_ptr& outputProtocolFactory, const std::shared_ptr& threadFactory = std::shared_ptr( - new apache::thrift::concurrency::PlatformThreadFactory(false))); + new apache::thrift::concurrency::ThreadFactory(false))); TThreadedServer( const std::shared_ptr& processor, @@ -75,7 +75,7 @@ public: const std::shared_ptr& outputProtocolFactory, const std::shared_ptr& threadFactory = std::shared_ptr( - new apache::thrift::concurrency::PlatformThreadFactory(false))); + new apache::thrift::concurrency::ThreadFactory(false))); virtual ~TThreadedServer(); diff --git a/lib/cpp/src/thrift/transport/TFileTransport.h b/lib/cpp/src/thrift/transport/TFileTransport.h index 6cc7bd24b..ece271aae 100644 --- a/lib/cpp/src/thrift/transport/TFileTransport.h +++ b/lib/cpp/src/thrift/transport/TFileTransport.h @@ -30,7 +30,7 @@ #include #include -#include +#include #include namespace apache { @@ -336,7 +336,7 @@ private: static const uint32_t DEFAULT_WRITER_THREAD_SLEEP_TIME_US = 60 * 1000 * 1000; // writer thread - apache::thrift::concurrency::PlatformThreadFactory threadFactory_; + apache::thrift::concurrency::ThreadFactory threadFactory_; std::shared_ptr writerThread_; // buffers to hold data before it is flushed. Each element of the buffer stores a msg that diff --git a/lib/cpp/src/thrift/windows/config.h b/lib/cpp/src/thrift/windows/config.h index a5f44577d..14a3f4f36 100644 --- a/lib/cpp/src/thrift/windows/config.h +++ b/lib/cpp/src/thrift/windows/config.h @@ -28,11 +28,6 @@ #error "This is a Windows header only" #endif -// use std::thread in MSVC11 (2012) or newer and in MinGW -#if (_MSC_VER >= 1700) || defined(__MINGW32__) -#define USE_STD_THREAD 1 -#endif - // Something that defines PRId64 is required to build #define HAVE_INTTYPES_H 1 diff --git a/lib/cpp/test/CMakeLists.txt b/lib/cpp/test/CMakeLists.txt index b30ef1780..8a8aadad1 100644 --- a/lib/cpp/test/CMakeLists.txt +++ b/lib/cpp/test/CMakeLists.txt @@ -84,11 +84,6 @@ set(UnitTest_SOURCES TServerTransportTest.cpp ) -if(NOT WITH_STDTHREADS AND NOT MSVC AND NOT MINGW) - list(APPEND UnitTest_SOURCES concurrency/MutexTest.cpp) - list(APPEND UnitTest_SOURCES concurrency/RWMutexStarveTest.cpp) -endif() - add_executable(UnitTests ${UnitTest_SOURCES}) target_link_libraries(UnitTests testgencpp ${Boost_LIBRARIES}) LINK_AGAINST_THRIFT_LIBRARY(UnitTests thrift) diff --git a/lib/cpp/test/Makefile.am b/lib/cpp/test/Makefile.am index d645a6501..5bb9eb760 100755 --- a/lib/cpp/test/Makefile.am +++ b/lib/cpp/test/Makefile.am @@ -135,10 +135,6 @@ UnitTests_SOURCES = \ TServerTransportTest.cpp \ TTransportCheckThrow.h -UnitTests_SOURCES += \ - concurrency/MutexTest.cpp \ - concurrency/RWMutexStarveTest.cpp - UnitTests_LDADD = \ libtestgencpp.la \ $(BOOST_TEST_LDADD) \ diff --git a/lib/cpp/test/TNonblockingSSLServerTest.cpp b/lib/cpp/test/TNonblockingSSLServerTest.cpp index 330380b34..2111de8b0 100644 --- a/lib/cpp/test/TNonblockingSSLServerTest.cpp +++ b/lib/cpp/test/TNonblockingSSLServerTest.cpp @@ -218,12 +218,7 @@ protected: runner->userEventBase = userEventBase_; std::unique_ptr threadFactory( - new apache::thrift::concurrency::PlatformThreadFactory( -#if !USE_STD_THREAD - concurrency::PlatformThreadFactory::OTHER, concurrency::PlatformThreadFactory::NORMAL, - 1, -#endif - false)); + new apache::thrift::concurrency::ThreadFactory(false)); thread = threadFactory->newThread(runner); thread->start(); runner->readyBarrier(); diff --git a/lib/cpp/test/TNonblockingServerTest.cpp b/lib/cpp/test/TNonblockingServerTest.cpp index f0bb283ff..f2f592203 100644 --- a/lib/cpp/test/TNonblockingServerTest.cpp +++ b/lib/cpp/test/TNonblockingServerTest.cpp @@ -33,7 +33,7 @@ using apache::thrift::concurrency::Guard; using apache::thrift::concurrency::Monitor; using apache::thrift::concurrency::Mutex; -using apache::thrift::concurrency::PlatformThreadFactory; +using apache::thrift::concurrency::ThreadFactory; using apache::thrift::concurrency::Runnable; using apache::thrift::concurrency::Thread; using apache::thrift::concurrency::ThreadFactory; @@ -147,12 +147,7 @@ protected: runner->userEventBase = userEventBase_; shared_ptr threadFactory( - new PlatformThreadFactory( -#if !USE_STD_THREAD - PlatformThreadFactory::OTHER, PlatformThreadFactory::NORMAL, - 1, -#endif - false)); + new ThreadFactory(false)); thread = threadFactory->newThread(runner); thread->start(); runner->readyBarrier(); diff --git a/lib/cpp/test/TServerIntegrationTest.cpp b/lib/cpp/test/TServerIntegrationTest.cpp index 7976c8bcf..a7680d89e 100644 --- a/lib/cpp/test/TServerIntegrationTest.cpp +++ b/lib/cpp/test/TServerIntegrationTest.cpp @@ -379,7 +379,7 @@ BOOST_FIXTURE_TEST_CASE(test_threadpool_factory, TServerIntegrationProcessorFactoryTestFixture) { pServer->getThreadManager()->threadFactory( shared_ptr( - new apache::thrift::concurrency::PlatformThreadFactory)); + new apache::thrift::concurrency::ThreadFactory)); pServer->getThreadManager()->start(); // thread factory has 4 threads as a default @@ -394,7 +394,7 @@ BOOST_FIXTURE_TEST_CASE(test_threadpool, TServerIntegrationProcessorTestFixture) { pServer->getThreadManager()->threadFactory( shared_ptr( - new apache::thrift::concurrency::PlatformThreadFactory)); + new apache::thrift::concurrency::ThreadFactory)); pServer->getThreadManager()->start(); // thread factory has 4 threads as a default @@ -409,7 +409,7 @@ BOOST_FIXTURE_TEST_CASE(test_threadpool_bound, TServerIntegrationProcessorTestFixture) { pServer->getThreadManager()->threadFactory( shared_ptr( - new apache::thrift::concurrency::PlatformThreadFactory)); + new apache::thrift::concurrency::ThreadFactory)); pServer->getThreadManager()->start(); pServer->setConcurrentClientLimit(4); @@ -420,7 +420,7 @@ BOOST_FIXTURE_TEST_CASE(test_threadpool_stress, TServerIntegrationProcessorTestFixture) { pServer->getThreadManager()->threadFactory( shared_ptr( - new apache::thrift::concurrency::PlatformThreadFactory)); + new apache::thrift::concurrency::ThreadFactory)); pServer->getThreadManager()->start(); stress(10, boost::posix_time::seconds(3)); diff --git a/lib/cpp/test/TransportTest.cpp b/lib/cpp/test/TransportTest.cpp index ce1954469..387207142 100644 --- a/lib/cpp/test/TransportTest.cpp +++ b/lib/cpp/test/TransportTest.cpp @@ -1031,7 +1031,7 @@ struct global_fixture { apache::thrift::transport::TWinsockSingleton::create(); #endif - apache::thrift::concurrency::PlatformThreadFactory factory; + apache::thrift::concurrency::ThreadFactory factory; factory.setDetached(false); alarmThread_ = factory.newThread( diff --git a/lib/cpp/test/concurrency/MutexTest.cpp b/lib/cpp/test/concurrency/MutexTest.cpp deleted file mode 100644 index 781ec1a40..000000000 --- a/lib/cpp/test/concurrency/MutexTest.cpp +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -// This is linked into the UnitTests test executable - -#include - -#include "thrift/concurrency/Exception.h" -#include "thrift/concurrency/Mutex.h" - -using boost::unit_test::test_suite; -using boost::unit_test::framework::master_test_suite; - -using namespace apache::thrift::concurrency; - -struct LFAT -{ - LFAT() - : uut(Mutex::ERRORCHECK_INITIALIZER) - { - BOOST_CHECK_EQUAL(0, pthread_mutex_init(&mx, 0)); - BOOST_CHECK_EQUAL(0, pthread_cond_init(&cv, 0)); - } - - Mutex uut; - pthread_mutex_t mx; - pthread_cond_t cv; -}; - -// Helper for testing mutex behavior when locked by another thread -void * lockFromAnotherThread(void *ptr) -{ - struct LFAT *lfat = (LFAT *)ptr; - BOOST_CHECK_EQUAL (0, pthread_mutex_lock(&lfat->mx)); // synchronize with testing thread - BOOST_CHECK_NO_THROW( lfat->uut.lock()); - BOOST_CHECK_EQUAL (0, pthread_cond_signal(&lfat->cv)); // tell testing thread we have locked the mutex - BOOST_CHECK_EQUAL (0, pthread_cond_wait(&lfat->cv, &lfat->mx)); // wait for testing thread to signal condition variable telling us to unlock - BOOST_CHECK_NO_THROW( lfat->uut.unlock()); - return ptr; // testing thread should join to ensure completeness -} - -BOOST_AUTO_TEST_SUITE(MutexTest) - -BOOST_AUTO_TEST_CASE(happy_path) -{ - Mutex uut(Mutex::ERRORCHECK_INITIALIZER); // needed to test unlocking twice without undefined behavior - - BOOST_CHECK_NO_THROW( uut.lock()); - BOOST_CHECK_THROW ( uut.lock(), SystemResourceException); // EDEADLK (this thread owns it) - BOOST_CHECK_NO_THROW( uut.unlock()); -} - -BOOST_AUTO_TEST_CASE(recursive_happy_path) -{ - Mutex uut(Mutex::RECURSIVE_INITIALIZER); - - BOOST_CHECK_NO_THROW( uut.lock()); - BOOST_CHECK_NO_THROW( uut.lock()); - BOOST_CHECK_NO_THROW( uut.unlock()); - BOOST_CHECK_NO_THROW( uut.lock()); - BOOST_CHECK_NO_THROW( uut.lock()); - BOOST_CHECK_NO_THROW( uut.unlock()); - BOOST_CHECK_NO_THROW( uut.lock()); - BOOST_CHECK_NO_THROW( uut.unlock()); - BOOST_CHECK_NO_THROW( uut.unlock()); - BOOST_CHECK_NO_THROW( uut.unlock()); -} - -BOOST_AUTO_TEST_CASE(trylock) -{ - Mutex uut(Mutex::ADAPTIVE_INITIALIZER); // just using another initializer for coverage - - BOOST_CHECK ( uut.trylock()); - BOOST_CHECK (!uut.trylock()); - BOOST_CHECK_NO_THROW( uut.unlock()); -} - -BOOST_AUTO_TEST_CASE(timedlock) -{ - pthread_t th; - struct LFAT lfat; - - BOOST_CHECK ( lfat.uut.timedlock(100)); - BOOST_CHECK_THROW ( lfat.uut.timedlock(100), - SystemResourceException); // EDEADLK (current thread owns mutex - logic error) - BOOST_CHECK_NO_THROW( lfat.uut.unlock()); - - BOOST_CHECK_EQUAL (0, pthread_mutex_lock(&lfat.mx)); // synchronize with helper thread - BOOST_CHECK_EQUAL (0, pthread_create(&th, NULL, - lockFromAnotherThread, &lfat)); // create helper thread - BOOST_CHECK_EQUAL (0, pthread_cond_wait(&lfat.cv, &lfat.mx)); // wait for helper thread to lock mutex - - BOOST_CHECK (!lfat.uut.timedlock(100)); // false: another thread owns the lock - - BOOST_CHECK_EQUAL (0, pthread_cond_signal(&lfat.cv)); // tell helper thread we are done - BOOST_CHECK_EQUAL (0, pthread_mutex_unlock(&lfat.mx)); // let helper thread clean up - BOOST_CHECK_EQUAL (0, pthread_join(th, 0)); // wait for testing thread to unlock and be done -} - -BOOST_AUTO_TEST_CASE(underlying) -{ - Mutex uut; - - BOOST_CHECK ( uut.getUnderlyingImpl()); -} - -BOOST_AUTO_TEST_SUITE_END() diff --git a/lib/cpp/test/concurrency/RWMutexStarveTest.cpp b/lib/cpp/test/concurrency/RWMutexStarveTest.cpp deleted file mode 100644 index 985a230a0..000000000 --- a/lib/cpp/test/concurrency/RWMutexStarveTest.cpp +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -// This is linked into the UnitTests test executable - -#include - -#include "thrift/concurrency/Mutex.h" -#include "thrift/concurrency/PosixThreadFactory.h" -#include - -using std::shared_ptr; -using boost::unit_test::test_suite; -using boost::unit_test::framework::master_test_suite; - -using namespace apache::thrift::concurrency; - -class Locker : public Runnable { -protected: - Locker(shared_ptr rwlock, bool writer) - : rwlock_(rwlock), writer_(writer), started_(false), gotLock_(false), signaled_(false) {} - -public: - virtual void run() { - started_ = true; - if (writer_) { - rwlock_->acquireWrite(); - } else { - rwlock_->acquireRead(); - } - gotLock_ = true; - while (!signaled_) { - usleep(5000); - } - rwlock_->release(); - } - - bool started() const { return started_; } - bool gotLock() const { return gotLock_; } - void signal() { signaled_ = true; } - -protected: - shared_ptr rwlock_; - bool writer_; - volatile bool started_; - volatile bool gotLock_; - volatile bool signaled_; -}; - -class Reader : public Locker { -public: - Reader(shared_ptr rwlock) : Locker(rwlock, false) {} -}; - -class Writer : public Locker { -public: - Writer(shared_ptr rwlock) : Locker(rwlock, true) {} -}; - -void test_starve(PosixThreadFactory::POLICY policy) { - // the man pages for pthread_wrlock_rdlock suggest that any OS guarantee about - // writer starvation may be influenced by the scheduling policy, so let's try - // all 3 policies to see if any of them work. - PosixThreadFactory factory(policy); - factory.setDetached(false); - - shared_ptr rwlock(new NoStarveReadWriteMutex()); - - shared_ptr reader1(new Reader(rwlock)); - shared_ptr reader2(new Reader(rwlock)); - shared_ptr writer(new Writer(rwlock)); - - shared_ptr treader1 = factory.newThread(reader1); - shared_ptr treader2 = factory.newThread(reader2); - shared_ptr twriter = factory.newThread(writer); - - // launch a reader and make sure he has the lock - treader1->start(); - while (!reader1->gotLock()) { - usleep(2000); - } - - // launch a writer and make sure he's blocked on the lock - twriter->start(); - while (!writer->started()) { - usleep(2000); - } - // tricky part... we can never be 100% sure that the writer is actually - // blocked on the lock, but we can pretty reasonably sure because we know - // he just executed the line immediately before getting the lock, and - // we'll wait a full second for him to get on it. - sleep(1); - - // launch a second reader... if the RWMutex guarantees that writers won't - // starve, this reader should not be able to acquire the lock until the writer - // has acquired and released it. - treader2->start(); - while (!reader2->started()) { - usleep(2000); - } - // again... can't be 100% sure the reader is waiting on (or has) the lock - // but we can be close. - sleep(1); - - // tell reader 1 to let go of the lock - reader1->signal(); - - // wait for someone to get the lock - while (!reader2->gotLock() && !writer->gotLock()) { - usleep(2000); - } - - // the test succeeded if the WRITER got the lock. - bool success = writer->gotLock(); - - // tell everyone we're done and wait for them to finish - reader2->signal(); - writer->signal(); - treader1->join(); - treader2->join(); - twriter->join(); - - // make sure it worked. - BOOST_CHECK_MESSAGE(success, "writer is starving"); -} - -BOOST_AUTO_TEST_SUITE(RWMutexStarveTest) - -BOOST_AUTO_TEST_CASE(test_starve_other) { - test_starve(PosixThreadFactory::OTHER); -} - -BOOST_AUTO_TEST_CASE(test_starve_rr) { - test_starve(PosixThreadFactory::ROUND_ROBIN); -} - -BOOST_AUTO_TEST_CASE(test_starve_fifo) { - test_starve(PosixThreadFactory::FIFO); -} - -BOOST_AUTO_TEST_SUITE_END() diff --git a/lib/cpp/test/concurrency/ThreadFactoryTests.h b/lib/cpp/test/concurrency/ThreadFactoryTests.h index ba9850286..8ab754c89 100644 --- a/lib/cpp/test/concurrency/ThreadFactoryTests.h +++ b/lib/cpp/test/concurrency/ThreadFactoryTests.h @@ -19,7 +19,7 @@ #include #include -#include +#include #include #include #include @@ -66,7 +66,7 @@ public: bool reapNThreads(int loop = 1, int count = 10) { - PlatformThreadFactory threadFactory = PlatformThreadFactory(); + ThreadFactory threadFactory = ThreadFactory(); shared_ptr monitor(new Monitor); for (int lix = 0; lix < loop; lix++) { @@ -159,7 +159,7 @@ public: shared_ptr task = shared_ptr(new SynchStartTask(monitor, state)); - PlatformThreadFactory threadFactory = PlatformThreadFactory(); + ThreadFactory threadFactory = ThreadFactory(); shared_ptr thread = threadFactory.newThread(task); @@ -265,7 +265,7 @@ public: Monitor& _mon; }; - void foo(PlatformThreadFactory* tf) { (void)tf; } + void foo(ThreadFactory* tf) { (void)tf; } bool floodNTest(size_t loop = 1, size_t count = 100000) { @@ -274,7 +274,7 @@ public: for (size_t lix = 0; lix < loop; lix++) { - PlatformThreadFactory threadFactory = PlatformThreadFactory(); + ThreadFactory threadFactory = ThreadFactory(); threadFactory.setDetached(true); for (size_t tix = 0; tix < count; tix++) { diff --git a/lib/cpp/test/concurrency/ThreadManagerTests.h b/lib/cpp/test/concurrency/ThreadManagerTests.h index d6c092d6c..b3a319a57 100644 --- a/lib/cpp/test/concurrency/ThreadManagerTests.h +++ b/lib/cpp/test/concurrency/ThreadManagerTests.h @@ -19,7 +19,7 @@ #include #include -#include +#include #include #include @@ -108,12 +108,9 @@ public: shared_ptr threadManager = ThreadManager::newSimpleThreadManager(workerCount); - shared_ptr threadFactory - = shared_ptr(new PlatformThreadFactory(false)); + shared_ptr threadFactory + = shared_ptr(new ThreadFactory(false)); -#if !USE_STD_THREAD - threadFactory->setPriority(PosixThreadFactory::HIGHEST); -#endif threadManager->threadFactory(threadFactory); threadManager->start(); @@ -257,12 +254,9 @@ public: shared_ptr threadManager = ThreadManager::newSimpleThreadManager(workerCount, pendingTaskMaxCount); - shared_ptr threadFactory - = shared_ptr(new PlatformThreadFactory()); + shared_ptr threadFactory + = shared_ptr(new ThreadFactory()); -#if !USE_STD_THREAD - threadFactory->setPriority(PosixThreadFactory::HIGHEST); -#endif threadManager->threadFactory(threadFactory); threadManager->start(); @@ -401,54 +395,15 @@ public: return false; } -#if !USE_STD_THREAD - // test once with a detached thread factory and once with a joinable thread factory - - shared_ptr threadFactory - = shared_ptr(new PosixThreadFactory(false)); - - std::cout << "\t\t\tapiTest with joinable thread factory" << std::endl; - if (!apiTestWithThreadFactory(threadFactory)) { - return false; - } - - threadFactory.reset(new PosixThreadFactory(true)); - std::cout << "\t\t\tapiTest with detached thread factory" << std::endl; - return apiTestWithThreadFactory(threadFactory); -#else - return apiTestWithThreadFactory(shared_ptr(new PlatformThreadFactory())); -#endif + return apiTestWithThreadFactory(shared_ptr(new ThreadFactory())); } - bool apiTestWithThreadFactory(shared_ptr threadFactory) + bool apiTestWithThreadFactory(shared_ptr threadFactory) { shared_ptr threadManager = ThreadManager::newSimpleThreadManager(1); threadManager->threadFactory(threadFactory); -#if !USE_STD_THREAD - threadFactory->setPriority(PosixThreadFactory::HIGHEST); - - // verify we cannot change the thread factory to one with the opposite detached setting - shared_ptr threadFactory2 - = shared_ptr(new PlatformThreadFactory( - PosixThreadFactory::ROUND_ROBIN, - PosixThreadFactory::NORMAL, - 1, - !threadFactory->isDetached())); - try { - threadManager->threadFactory(threadFactory2); - // if the call succeeded we changed the thread factory to one that had the opposite setting for "isDetached()". - // this is bad, because the thread manager checks with the thread factory to see if it should join threads - // as they are leaving - so the detached status of new threads cannot change while there are existing threads. - std::cerr << "\t\t\tShould not be able to change thread factory detached disposition" << std::endl; - return false; - } - catch (InvalidArgumentException& ex) { - /* expected */ - } -#endif - std::cout << "\t\t\t\tstarting.. " << std::endl; threadManager->start(); diff --git a/lib/cpp/test/concurrency/TimerManagerTests.h b/lib/cpp/test/concurrency/TimerManagerTests.h index 1c52c470b..c15b14b80 100644 --- a/lib/cpp/test/concurrency/TimerManagerTests.h +++ b/lib/cpp/test/concurrency/TimerManagerTests.h @@ -18,7 +18,7 @@ */ #include -#include +#include #include #include @@ -80,7 +80,7 @@ public: { TimerManager timerManager; - timerManager.threadFactory(shared_ptr(new PlatformThreadFactory())); + timerManager.threadFactory(shared_ptr(new ThreadFactory())); timerManager.start(); if (timerManager.state() != TimerManager::STARTED) { std::cerr << "timerManager is not in the STARTED state, but should be" << std::endl; @@ -125,7 +125,7 @@ public: */ bool test01(int64_t timeout = 1000LL) { TimerManager timerManager; - timerManager.threadFactory(shared_ptr(new PlatformThreadFactory())); + timerManager.threadFactory(shared_ptr(new ThreadFactory())); timerManager.start(); assert(timerManager.state() == TimerManager::STARTED); @@ -158,7 +158,7 @@ public: */ bool test02(int64_t timeout = 1000LL) { TimerManager timerManager; - timerManager.threadFactory(shared_ptr(new PlatformThreadFactory())); + timerManager.threadFactory(shared_ptr(new ThreadFactory())); timerManager.start(); assert(timerManager.state() == TimerManager::STARTED); @@ -191,7 +191,7 @@ public: */ bool test03(int64_t timeout = 1000LL) { TimerManager timerManager; - timerManager.threadFactory(shared_ptr(new PlatformThreadFactory())); + timerManager.threadFactory(shared_ptr(new ThreadFactory())); timerManager.start(); assert(timerManager.state() == TimerManager::STARTED); @@ -228,7 +228,7 @@ public: */ bool test04(int64_t timeout = 1000LL) { TimerManager timerManager; - timerManager.threadFactory(shared_ptr(new PlatformThreadFactory())); + timerManager.threadFactory(shared_ptr(new ThreadFactory())); timerManager.start(); assert(timerManager.state() == TimerManager::STARTED); diff --git a/lib/cpp/test/processor/ProcessorTest.cpp b/lib/cpp/test/processor/ProcessorTest.cpp index 36ce01311..9483a0e91 100644 --- a/lib/cpp/test/processor/ProcessorTest.cpp +++ b/lib/cpp/test/processor/ProcessorTest.cpp @@ -25,7 +25,7 @@ #include -#include +#include #include #include #include @@ -94,7 +94,7 @@ public: const std::shared_ptr& protocolFactory) { std::shared_ptr socket(new TServerSocket(port)); - std::shared_ptr threadFactory(new PlatformThreadFactory); + std::shared_ptr threadFactory(new ThreadFactory); std::shared_ptr threadManager = ThreadManager::newSimpleThreadManager(8); threadManager->threadFactory(threadFactory); threadManager->start(); @@ -123,7 +123,7 @@ public: } std::shared_ptr socket(new TNonblockingServerSocket(port)); - std::shared_ptr threadFactory(new PlatformThreadFactory); + std::shared_ptr threadFactory(new ThreadFactory); std::shared_ptr threadManager = ThreadManager::newSimpleThreadManager(8); threadManager->threadFactory(threadFactory); threadManager->start(); diff --git a/lib/cpp/test/processor/ServerThread.cpp b/lib/cpp/test/processor/ServerThread.cpp index 4d1ec4c1d..b0505005b 100644 --- a/lib/cpp/test/processor/ServerThread.cpp +++ b/lib/cpp/test/processor/ServerThread.cpp @@ -21,7 +21,7 @@ #include "ServerThread.h" -#include +#include #include #include #include @@ -38,7 +38,7 @@ void ServerThread::start() { helper_.reset(new Helper(this)); // Start the other thread - concurrency::PlatformThreadFactory threadFactory; + concurrency::ThreadFactory threadFactory; threadFactory.setDetached(false); thread_ = threadFactory.newThread(helper_); -- cgit v1.2.1