diff options
23 files changed, 805 insertions, 624 deletions
diff --git a/appveyor.yml b/appveyor.yml index cfd8b51f3..03ee29543 100755 --- a/appveyor.yml +++ b/appveyor.yml @@ -89,6 +89,5 @@ build_script: # CTest fails to invoke ant seemingly due to "ant.bat" v.s. "ant" (shell script) conflict. # Currently, everything that involves OpenSSL seems to hang forever on our Appveyor setup. # Also a few C++ tests hang (on Appveyor or on Windows in general). -- ctest -C Release --timeout 600 -VV -E "(concurrency_test|processor_test|TInterruptTest|StressTestNonBlocking|OpenSSLManualInitTest|SecurityTest|PythonTestSSLSocket|python_test$|^Java)" - -#TODO make it perfect ;-r +- ctest -C Release --timeout 600 -VV -E "(StressTestNonBlocking|PythonTestSSLSocket|python_test$|^Java)" +# TODO make it perfect ;-r diff --git a/build/docker/scripts/cmake.sh b/build/docker/scripts/cmake.sh index 26ccb109c..6508e7108 100755 --- a/build/docker/scripts/cmake.sh +++ b/build/docker/scripts/cmake.sh @@ -19,4 +19,5 @@ for LIB in $BUILD_LIBS; do done $MAKEPROG -j3 cpack -ctest -VV -E "(concurrency_test|processor_test)" +ctest -VV +# was: -E "(concurrency_test|processor_test)" diff --git a/lib/cpp/src/thrift/concurrency/BoostThreadFactory.cpp b/lib/cpp/src/thrift/concurrency/BoostThreadFactory.cpp index 96cb6d65d..a72d38b9c 100644 --- a/lib/cpp/src/thrift/concurrency/BoostThreadFactory.cpp +++ b/lib/cpp/src/thrift/concurrency/BoostThreadFactory.cpp @@ -126,54 +126,19 @@ void* BoostThread::threadMain(void* arg) { return (void*)0; } -/** - * POSIX Thread factory implementation - */ -class BoostThreadFactory::Impl { - -private: - bool detached_; - -public: - Impl(bool detached) : detached_(detached) {} - - /** - * Creates a new POSIX thread to run the runnable object - * - * @param runnable A runnable object - */ - shared_ptr<Thread> newThread(shared_ptr<Runnable> runnable) const { - shared_ptr<BoostThread> result = shared_ptr<BoostThread>(new BoostThread(detached_, runnable)); - result->weakRef(result); - runnable->thread(result); - return result; - } - - bool isDetached() const { return detached_; } - - void setDetached(bool value) { detached_ = value; } - - Thread::id_t getCurrentThreadId() const { return boost::this_thread::get_id(); } -}; - BoostThreadFactory::BoostThreadFactory(bool detached) - : impl_(new BoostThreadFactory::Impl(detached)) { + : ThreadFactory(detached) { } shared_ptr<Thread> BoostThreadFactory::newThread(shared_ptr<Runnable> runnable) const { - return impl_->newThread(runnable); -} - -bool BoostThreadFactory::isDetached() const { - return impl_->isDetached(); -} - -void BoostThreadFactory::setDetached(bool value) { - impl_->setDetached(value); + shared_ptr<BoostThread> result = shared_ptr<BoostThread>(new BoostThread(isDetached(), runnable)); + result->weakRef(result); + runnable->thread(result); + return result; } Thread::id_t BoostThreadFactory::getCurrentThreadId() const { - return impl_->getCurrentThreadId(); + return boost::this_thread::get_id(); } } } diff --git a/lib/cpp/src/thrift/concurrency/BoostThreadFactory.h b/lib/cpp/src/thrift/concurrency/BoostThreadFactory.h index e6d1a56de..7973245a0 100644 --- a/lib/cpp/src/thrift/concurrency/BoostThreadFactory.h +++ b/lib/cpp/src/thrift/concurrency/BoostThreadFactory.h @@ -55,21 +55,8 @@ public: // From ThreadFactory; Thread::id_t getCurrentThreadId() const; - - /** - * Sets detached mode of threads - */ - virtual void setDetached(bool detached); - - /** - * Gets current detached mode - */ - virtual bool isDetached() const; - -private: - class Impl; - boost::shared_ptr<Impl> impl_; }; + } } } // apache::thrift::concurrency diff --git a/lib/cpp/src/thrift/concurrency/PosixThreadFactory.cpp b/lib/cpp/src/thrift/concurrency/PosixThreadFactory.cpp index 05a3c028c..6a0b47cb4 100644 --- a/lib/cpp/src/thrift/concurrency/PosixThreadFactory.cpp +++ b/lib/cpp/src/thrift/concurrency/PosixThreadFactory.cpp @@ -214,149 +214,104 @@ void* PthreadThread::threadMain(void* arg) { } /** - * POSIX Thread factory implementation + * Converts generic posix thread schedule policy enums into pthread + * API values. */ -class PosixThreadFactory::Impl { - -private: - POLICY policy_; - PRIORITY priority_; - int stackSize_; - bool detached_; - - /** - * Converts generic posix thread schedule policy enums into pthread - * API values. - */ - static int toPthreadPolicy(POLICY policy) { - switch (policy) { - case OTHER: - return SCHED_OTHER; - case FIFO: - return SCHED_FIFO; - case ROUND_ROBIN: - return SCHED_RR; - } +static int toPthreadPolicy(PosixThreadFactory::POLICY policy) { + switch (policy) { + case PosixThreadFactory::OTHER: return SCHED_OTHER; + case PosixThreadFactory::FIFO: + return SCHED_FIFO; + case PosixThreadFactory::ROUND_ROBIN: + return SCHED_RR; } + return SCHED_OTHER; +} - /** - * Converts relative thread priorities to absolute value based on posix - * thread scheduler policy - * - * The idea is simply to divide up the priority range for the given policy - * into the correpsonding relative priority level (lowest..highest) and - * then pro-rate accordingly. - */ - static int toPthreadPriority(POLICY policy, PRIORITY priority) { - int pthread_policy = toPthreadPolicy(policy); - int min_priority = 0; - int max_priority = 0; +/** + * Converts relative thread priorities to absolute value based on posix + * thread scheduler policy + * + * The idea is simply to divide up the priority range for the given policy + * into the correpsonding relative priority level (lowest..highest) and + * then pro-rate accordingly. + */ +static int toPthreadPriority(PosixThreadFactory::POLICY policy, PosixThreadFactory::PRIORITY priority) { + int pthread_policy = toPthreadPolicy(policy); + int min_priority = 0; + int max_priority = 0; #ifdef HAVE_SCHED_GET_PRIORITY_MIN - min_priority = sched_get_priority_min(pthread_policy); + min_priority = sched_get_priority_min(pthread_policy); #endif #ifdef HAVE_SCHED_GET_PRIORITY_MAX - max_priority = sched_get_priority_max(pthread_policy); + max_priority = sched_get_priority_max(pthread_policy); #endif - int quanta = (HIGHEST - LOWEST) + 1; - float stepsperquanta = (float)(max_priority - min_priority) / quanta; - - if (priority <= HIGHEST) { - return (int)(min_priority + stepsperquanta * priority); - } else { - // should never get here for priority increments. - assert(false); - return (int)(min_priority + stepsperquanta * NORMAL); - } - } - -public: - Impl(POLICY policy, PRIORITY priority, int stackSize, bool detached) - : policy_(policy), priority_(priority), stackSize_(stackSize), detached_(detached) {} - - /** - * Creates a new POSIX thread to run the runnable object - * - * @param runnable A runnable object - */ - shared_ptr<Thread> newThread(shared_ptr<Runnable> runnable) const { - shared_ptr<PthreadThread> result - = shared_ptr<PthreadThread>(new PthreadThread(toPthreadPolicy(policy_), - toPthreadPriority(policy_, priority_), - stackSize_, - detached_, - runnable)); - result->weakRef(result); - runnable->thread(result); - return result; - } - - int getStackSize() const { return stackSize_; } - - void setStackSize(int value) { stackSize_ = value; } - - PRIORITY getPriority() const { return priority_; } - - /** - * Sets priority. - * - * XXX - * Need to handle incremental priorities properly. - */ - void setPriority(PRIORITY value) { priority_ = value; } - - bool isDetached() const { return detached_; } - - void setDetached(bool value) { detached_ = value; } - - Thread::id_t getCurrentThreadId() const { - -#ifndef _WIN32 - return (Thread::id_t)pthread_self(); -#else - return (Thread::id_t)pthread_self().p; -#endif // _WIN32 + int quanta = (PosixThreadFactory::HIGHEST - PosixThreadFactory::LOWEST) + 1; + float stepsperquanta = (float)(max_priority - min_priority) / quanta; + + if (priority <= PosixThreadFactory::HIGHEST) { + return (int)(min_priority + stepsperquanta * priority); + } else { + // should never get here for priority increments. + assert(false); + return (int)(min_priority + stepsperquanta * PosixThreadFactory::NORMAL); } -}; +} PosixThreadFactory::PosixThreadFactory(POLICY policy, PRIORITY priority, int stackSize, bool detached) - : impl_(new PosixThreadFactory::Impl(policy, priority, stackSize, detached)) { + : ThreadFactory(detached), + policy_(policy), + priority_(priority), + stackSize_(stackSize) { +} + +PosixThreadFactory::PosixThreadFactory(bool detached) + : ThreadFactory(detached), + policy_(ROUND_ROBIN), + priority_(NORMAL), + stackSize_(1) { } shared_ptr<Thread> PosixThreadFactory::newThread(shared_ptr<Runnable> runnable) const { - return impl_->newThread(runnable); + shared_ptr<PthreadThread> result + = shared_ptr<PthreadThread>(new PthreadThread(toPthreadPolicy(policy_), + toPthreadPriority(policy_, priority_), + stackSize_, + isDetached(), + runnable)); + result->weakRef(result); + runnable->thread(result); + return result; } int PosixThreadFactory::getStackSize() const { - return impl_->getStackSize(); + return stackSize_; } void PosixThreadFactory::setStackSize(int value) { - impl_->setStackSize(value); + stackSize_ = value; } PosixThreadFactory::PRIORITY PosixThreadFactory::getPriority() const { - return impl_->getPriority(); -} - -void PosixThreadFactory::setPriority(PosixThreadFactory::PRIORITY value) { - impl_->setPriority(value); -} - -bool PosixThreadFactory::isDetached() const { - return impl_->isDetached(); + return priority_; } -void PosixThreadFactory::setDetached(bool value) { - impl_->setDetached(value); +void PosixThreadFactory::setPriority(PRIORITY value) { + priority_ = value; } Thread::id_t PosixThreadFactory::getCurrentThreadId() const { - return impl_->getCurrentThreadId(); +#ifndef _WIN32 + return (Thread::id_t)pthread_self(); +#else + return (Thread::id_t)pthread_self().p; +#endif // _WIN32 } + } } } // apache::thrift::concurrency diff --git a/lib/cpp/src/thrift/concurrency/PosixThreadFactory.h b/lib/cpp/src/thrift/concurrency/PosixThreadFactory.h index b26d296d6..c1bbe5c3b 100644 --- a/lib/cpp/src/thrift/concurrency/PosixThreadFactory.h +++ b/lib/cpp/src/thrift/concurrency/PosixThreadFactory.h @@ -74,12 +74,19 @@ public: * * By default threads are not joinable. */ - PosixThreadFactory(POLICY policy = ROUND_ROBIN, PRIORITY priority = NORMAL, int stackSize = 1, bool detached = true); + /** + * Provide a constructor compatible with the other factories + * The default policy is POLICY::ROUND_ROBIN. + * The default priority is PRIORITY::NORMAL. + * The default stackSize is 1. + */ + PosixThreadFactory(bool detached); + // From ThreadFactory; boost::shared_ptr<Thread> newThread(boost::shared_ptr<Runnable> runnable) const; @@ -87,14 +94,14 @@ public: Thread::id_t getCurrentThreadId() const; /** - * Gets stack size for created threads + * Gets stack size for newly created threads * * @return int size in megabytes */ virtual int getStackSize() const; /** - * Sets stack size for created threads + * Sets stack size for newly created threads * * @param value size in megabytes */ @@ -110,19 +117,10 @@ public: */ virtual void setPriority(PRIORITY priority); - /** - * Sets detached mode of threads - */ - virtual void setDetached(bool detached); - - /** - * Gets current detached mode - */ - virtual bool isDetached() const; - private: - class Impl; - boost::shared_ptr<Impl> impl_; + POLICY policy_; + PRIORITY priority_; + int stackSize_; }; } } diff --git a/lib/cpp/src/thrift/concurrency/StdThreadFactory.cpp b/lib/cpp/src/thrift/concurrency/StdThreadFactory.cpp index d57e7ecbc..66c7e7519 100644 --- a/lib/cpp/src/thrift/concurrency/StdThreadFactory.cpp +++ b/lib/cpp/src/thrift/concurrency/StdThreadFactory.cpp @@ -116,53 +116,17 @@ void StdThread::threadMain(boost::shared_ptr<StdThread> thread) { return; } -/** - * std::thread factory implementation - */ -class StdThreadFactory::Impl { - -private: - bool detached_; - -public: - Impl(bool detached) : detached_(detached) {} - - /** - * Creates a new std::thread to run the runnable object - * - * @param runnable A runnable object - */ - boost::shared_ptr<Thread> newThread(boost::shared_ptr<Runnable> runnable) const { - boost::shared_ptr<StdThread> result - = boost::shared_ptr<StdThread>(new StdThread(detached_, runnable)); - runnable->thread(result); - return result; - } - - bool isDetached() const { return detached_; } - - void setDetached(bool value) { detached_ = value; } - - Thread::id_t getCurrentThreadId() const { return std::this_thread::get_id(); } -}; - -StdThreadFactory::StdThreadFactory(bool detached) : impl_(new StdThreadFactory::Impl(detached)) { +StdThreadFactory::StdThreadFactory(bool detached) : ThreadFactory(detached) { } boost::shared_ptr<Thread> StdThreadFactory::newThread(boost::shared_ptr<Runnable> runnable) const { - return impl_->newThread(runnable); -} - -bool StdThreadFactory::isDetached() const { - return impl_->isDetached(); -} - -void StdThreadFactory::setDetached(bool value) { - impl_->setDetached(value); + boost::shared_ptr<StdThread> result = boost::shared_ptr<StdThread>(new StdThread(isDetached(), runnable)); + runnable->thread(result); + return result; } Thread::id_t StdThreadFactory::getCurrentThreadId() const { - return impl_->getCurrentThreadId(); + return std::this_thread::get_id(); } } } diff --git a/lib/cpp/src/thrift/concurrency/StdThreadFactory.h b/lib/cpp/src/thrift/concurrency/StdThreadFactory.h index fb86bbf4b..88f00bea4 100644 --- a/lib/cpp/src/thrift/concurrency/StdThreadFactory.h +++ b/lib/cpp/src/thrift/concurrency/StdThreadFactory.h @@ -52,21 +52,8 @@ public: // From ThreadFactory; Thread::id_t getCurrentThreadId() const; - - /** - * Sets detached mode of threads - */ - virtual void setDetached(bool detached); - - /** - * Gets current detached mode - */ - virtual bool isDetached() const; - -private: - class Impl; - boost::shared_ptr<Impl> impl_; }; + } } } // apache::thrift::concurrency diff --git a/lib/cpp/src/thrift/concurrency/Thread.h b/lib/cpp/src/thrift/concurrency/Thread.h index f7c7bd618..2e154899f 100644 --- a/lib/cpp/src/thrift/concurrency/Thread.h +++ b/lib/cpp/src/thrift/concurrency/Thread.h @@ -108,8 +108,9 @@ public: virtual void start() = 0; /** - * Join this thread. Current thread blocks until this target thread - * completes. + * Join this thread. If this thread is joinable, the calling thread blocks + * until this thread completes. If the target thread is not joinable, then + * nothing happens. */ virtual void join() = 0; @@ -135,30 +136,39 @@ private: * object for execution */ class ThreadFactory { +protected: + ThreadFactory(bool detached) : detached_(detached) { } + public: - virtual ~ThreadFactory() {} + virtual ~ThreadFactory() { } /** * Gets current detached mode */ - virtual bool isDetached() const = 0; + bool isDetached() const { return detached_; } /** - * Create a new thread. + * Sets the detached disposition of newly created threads. */ - virtual boost::shared_ptr<Thread> newThread(boost::shared_ptr<Runnable> runnable) const = 0; + void setDetached(bool detached) { detached_ = detached; } /** - * Sets detached mode of threads + * Create a new thread. */ - virtual void setDetached(bool detached) = 0; - - static const Thread::id_t unknown_thread_id; + virtual boost::shared_ptr<Thread> newThread(boost::shared_ptr<Runnable> runnable) const = 0; /** * Gets the current thread id or unknown_thread_id if the current thread is not a thrift thread */ virtual Thread::id_t getCurrentThreadId() const = 0; + + /** + * For code readability define the unknown/undefined thread id + */ + static const Thread::id_t unknown_thread_id; + +private: + bool detached_; }; } diff --git a/lib/cpp/src/thrift/concurrency/ThreadManager.cpp b/lib/cpp/src/thrift/concurrency/ThreadManager.cpp index 24bfeec44..c4726dde3 100644 --- a/lib/cpp/src/thrift/concurrency/ThreadManager.cpp +++ b/lib/cpp/src/thrift/concurrency/ThreadManager.cpp @@ -26,8 +26,8 @@ #include <boost/shared_ptr.hpp> -#include <assert.h> -#include <queue> +#include <stdexcept> +#include <deque> #include <set> #if defined(DEBUG) @@ -49,6 +49,9 @@ using boost::dynamic_pointer_cast; * it maintains statistics on number of idle threads, number of active threads, * task backlog, and average wait and service times. * + * There are three different monitors used for signaling different conditions + * however they all share the same mutex_. + * * @version $Id:$ */ class ThreadManager::Impl : public ThreadManager { @@ -62,25 +65,26 @@ public: expiredCount_(0), state_(ThreadManager::UNINITIALIZED), monitor_(&mutex_), - maxMonitor_(&mutex_) {} + maxMonitor_(&mutex_), + workerMonitor_(&mutex_) {} ~Impl() { stop(); } void start(); - - void stop() { stopImpl(false); } - - void join() { stopImpl(true); } + void stop(); ThreadManager::STATE state() const { return state_; } shared_ptr<ThreadFactory> threadFactory() const { - Synchronized s(monitor_); + Guard g(mutex_); return threadFactory_; } void threadFactory(shared_ptr<ThreadFactory> value) { - Synchronized s(monitor_); + Guard g(mutex_); + if (threadFactory_ && threadFactory_->isDetached() != value->isDetached()) { + throw InvalidArgumentException(); + } threadFactory_ = value; } @@ -91,51 +95,65 @@ public: size_t idleWorkerCount() const { return idleCount_; } size_t workerCount() const { - Synchronized s(monitor_); + Guard g(mutex_); return workerCount_; } size_t pendingTaskCount() const { - Synchronized s(monitor_); + Guard g(mutex_); return tasks_.size(); } size_t totalTaskCount() const { - Synchronized s(monitor_); + Guard g(mutex_); return tasks_.size() + workerCount_ - idleCount_; } size_t pendingTaskCountMax() const { - Synchronized s(monitor_); + Guard g(mutex_); return pendingTaskCountMax_; } size_t expiredTaskCount() { - Synchronized s(monitor_); - size_t result = expiredCount_; - expiredCount_ = 0; - return result; + Guard g(mutex_); + return expiredCount_; } void pendingTaskCountMax(const size_t value) { - Synchronized s(monitor_); + Guard g(mutex_); pendingTaskCountMax_ = value; } - bool canSleep(); - void add(shared_ptr<Runnable> value, int64_t timeout, int64_t expiration); void remove(shared_ptr<Runnable> task); shared_ptr<Runnable> removeNextPending(); - void removeExpiredTasks(); + void removeExpiredTasks() { + removeExpired(false); + } void setExpireCallback(ExpireCallback expireCallback); private: - void stopImpl(bool join); + /** + * Remove one or more expired tasks. + * \param[in] justOne if true, try to remove just one task and return + */ + void removeExpired(bool justOne); + + /** + * \returns whether it is acceptable to block, depending on the current thread id + */ + bool canSleep() const; + + /** + * Lowers the maximum worker count and blocks until enough worker threads complete + * to get to the new maximum worker limit. The caller is responsible for acquiring + * a lock on the class mutex_. + */ + void removeWorkersUnderLock(size_t value); size_t workerCount_; size_t workerMaxCount_; @@ -148,11 +166,12 @@ private: shared_ptr<ThreadFactory> threadFactory_; friend class ThreadManager::Task; - std::queue<shared_ptr<Task> > tasks_; + typedef std::deque<shared_ptr<Task> > TaskQueue; + TaskQueue tasks_; Mutex mutex_; Monitor monitor_; Monitor maxMonitor_; - Monitor workerMonitor_; + Monitor workerMonitor_; // used to synchronize changes in worker count friend class ThreadManager::Worker; std::set<shared_ptr<Thread> > workers_; @@ -163,7 +182,7 @@ private: class ThreadManager::Task : public Runnable { public: - enum STATE { WAITING, EXECUTING, CANCELLED, COMPLETE }; + enum STATE { WAITING, EXECUTING, TIMEDOUT, COMPLETE }; Task(shared_ptr<Runnable> runnable, int64_t expiration = 0LL) : runnable_(runnable), @@ -194,7 +213,7 @@ class ThreadManager::Worker : public Runnable { enum STATE { UNINITIALIZED, STARTING, STARTED, STOPPING, STOPPED }; public: - Worker(ThreadManager::Impl* manager) : manager_(manager), state_(UNINITIALIZED), idle_(false) {} + Worker(ThreadManager::Impl* manager) : manager_(manager), state_(UNINITIALIZED) {} ~Worker() {} @@ -212,78 +231,82 @@ public: * execute. */ void run() { - bool active = false; + Guard g(manager_->mutex_); + + /** + * This method has three parts; one is to check for and account for + * admitting a task which happens under a lock. Then the lock is released + * and the task itself is executed. Finally we do some accounting + * under lock again when the task completes. + */ + + /** + * Admitting + */ + /** * Increment worker semaphore and notify manager if worker count reached * desired max - * - * Note: We have to release the monitor and acquire the workerMonitor - * since that is what the manager blocks on for worker add/remove */ - { - bool notifyManager = false; - { - Synchronized s(manager_->monitor_); - active = manager_->workerCount_ < manager_->workerMaxCount_; - if (active) { - manager_->workerCount_++; - notifyManager = manager_->workerCount_ == manager_->workerMaxCount_; - } - } - - if (notifyManager) { - Synchronized s(manager_->workerMonitor_); + bool active = manager_->workerCount_ < manager_->workerMaxCount_; + if (active) { + if (++manager_->workerCount_ == manager_->workerMaxCount_) { manager_->workerMonitor_.notify(); } } while (active) { - shared_ptr<ThreadManager::Task> task; - /** - * While holding manager monitor block for non-empty task queue (Also - * check that the thread hasn't been requested to stop). Once the queue - * is non-empty, dequeue a task, release monitor, and execute. If the - * worker max count has been decremented such that we exceed it, mark - * ourself inactive, decrement the worker count and notify the manager - * (technically we're notifying the next blocked thread but eventually - * the manager will see it. - */ - { - Guard g(manager_->mutex_); + * While holding manager monitor block for non-empty task queue (Also + * check that the thread hasn't been requested to stop). Once the queue + * is non-empty, dequeue a task, release monitor, and execute. If the + * worker max count has been decremented such that we exceed it, mark + * ourself inactive, decrement the worker count and notify the manager + * (technically we're notifying the next blocked thread but eventually + * the manager will see it. + */ + active = isActive(); + + while (active && manager_->tasks_.empty()) { + manager_->idleCount_++; + manager_->monitor_.wait(); active = isActive(); + manager_->idleCount_--; + } - while (active && manager_->tasks_.empty()) { - manager_->idleCount_++; - idle_ = true; - manager_->monitor_.wait(); - active = isActive(); - idle_ = false; - manager_->idleCount_--; - } - - if (active) { - manager_->removeExpiredTasks(); + shared_ptr<ThreadManager::Task> task; - if (!manager_->tasks_.empty()) { - task = manager_->tasks_.front(); - manager_->tasks_.pop(); - if (task->state_ == ThreadManager::Task::WAITING) { - task->state_ = ThreadManager::Task::EXECUTING; - } + if (active) { + if (!manager_->tasks_.empty()) { + task = manager_->tasks_.front(); + manager_->tasks_.pop_front(); + if (task->state_ == ThreadManager::Task::WAITING) { + // If the state is changed to anything other than EXECUTING or TIMEDOUT here + // then the execution loop needs to be changed below. + task->state_ = + (task->getExpireTime() && task->getExpireTime() < Util::currentTime()) ? + ThreadManager::Task::TIMEDOUT : + ThreadManager::Task::EXECUTING; } + } - /* If we have a pending task max and we just dropped below it, wakeup any - thread that might be blocked on add. */ - if (manager_->pendingTaskCountMax_ != 0 - && manager_->tasks_.size() <= manager_->pendingTaskCountMax_ - 1) { - manager_->maxMonitor_.notify(); - } + /* If we have a pending task max and we just dropped below it, wakeup any + thread that might be blocked on add. */ + if (manager_->pendingTaskCountMax_ != 0 + && manager_->tasks_.size() <= manager_->pendingTaskCountMax_ - 1) { + manager_->maxMonitor_.notify(); } } + /** + * Execution - not holding a lock + */ if (task) { if (task->state_ == ThreadManager::Task::EXECUTING) { + + // Release the lock so we can run the task without blocking the thread manager + manager_->mutex_.unlock(); + try { task->run(); } catch (const std::exception& e) { @@ -291,29 +314,31 @@ public: } catch (...) { GlobalOutput.printf("[ERROR] task->run() raised an unknown exception"); } + + // Re-acquire the lock to proceed in the thread manager + manager_->mutex_.lock(); + + } else if (manager_->expireCallback_) { + // The only other state the task could have been in is TIMEDOUT (see above) + manager_->expireCallback_(task->getRunnable()); + manager_->expiredCount_++; } } } - { - Synchronized s(manager_->workerMonitor_); - manager_->deadWorkers_.insert(this->thread()); - idle_ = true; - manager_->workerCount_--; - bool notifyManager = (manager_->workerCount_ == manager_->workerMaxCount_); - if (notifyManager) { - manager_->workerMonitor_.notify(); - } + /** + * Final accounting for the worker thread that is done working + */ + manager_->deadWorkers_.insert(this->thread()); + if (--manager_->workerCount_ == manager_->workerMaxCount_) { + manager_->workerMonitor_.notify(); } - - return; } private: ThreadManager::Impl* manager_; friend class ThreadManager::Impl; STATE state_; - bool idle_; }; void ThreadManager::Impl::addWorker(size_t value) { @@ -324,11 +349,9 @@ void ThreadManager::Impl::addWorker(size_t value) { newThreads.insert(threadFactory_->newThread(worker)); } - { - Synchronized s(monitor_); - workerMaxCount_ += value; - workers_.insert(newThreads.begin(), newThreads.end()); - } + Guard g(mutex_); + workerMaxCount_ += value; + workers_.insert(newThreads.begin(), newThreads.end()); for (std::set<shared_ptr<Thread> >::iterator ix = newThreads.begin(); ix != newThreads.end(); ++ix) { @@ -339,103 +362,92 @@ void ThreadManager::Impl::addWorker(size_t value) { idMap_.insert(std::pair<const Thread::id_t, shared_ptr<Thread> >((*ix)->getId(), *ix)); } - { - Synchronized s(workerMonitor_); - while (workerCount_ != workerMaxCount_) { - workerMonitor_.wait(); - } + while (workerCount_ != workerMaxCount_) { + workerMonitor_.wait(); } } void ThreadManager::Impl::start() { - + Guard g(mutex_); if (state_ == ThreadManager::STOPPED) { return; } - { - Synchronized s(monitor_); - if (state_ == ThreadManager::UNINITIALIZED) { - if (!threadFactory_) { - throw InvalidArgumentException(); - } - state_ = ThreadManager::STARTED; - monitor_.notifyAll(); + if (state_ == ThreadManager::UNINITIALIZED) { + if (!threadFactory_) { + throw InvalidArgumentException(); } + state_ = ThreadManager::STARTED; + monitor_.notifyAll(); + } - while (state_ == STARTING) { - monitor_.wait(); - } + while (state_ == STARTING) { + monitor_.wait(); } } -void ThreadManager::Impl::stopImpl(bool join) { +void ThreadManager::Impl::stop() { + Guard g(mutex_); bool doStop = false; - if (state_ == ThreadManager::STOPPED) { - return; - } - { - Synchronized s(monitor_); - if (state_ != ThreadManager::STOPPING && state_ != ThreadManager::JOINING - && state_ != ThreadManager::STOPPED) { - doStop = true; - state_ = join ? ThreadManager::JOINING : ThreadManager::STOPPING; - } + if (state_ != ThreadManager::STOPPING && state_ != ThreadManager::JOINING + && state_ != ThreadManager::STOPPED) { + doStop = true; + state_ = ThreadManager::JOINING; } if (doStop) { - removeWorker(workerCount_); + removeWorkersUnderLock(workerCount_); } - // XXX - // should be able to block here for transition to STOPPED since we're no - // using shared_ptrs - - { - Synchronized s(monitor_); - state_ = ThreadManager::STOPPED; - } + state_ = ThreadManager::STOPPED; } void ThreadManager::Impl::removeWorker(size_t value) { - std::set<shared_ptr<Thread> > removedThreads; - { - Synchronized s(monitor_); - if (value > workerMaxCount_) { - throw InvalidArgumentException(); - } + Guard g(mutex_); + removeWorkersUnderLock(value); +} + +void ThreadManager::Impl::removeWorkersUnderLock(size_t value) { + if (value > workerMaxCount_) { + throw InvalidArgumentException(); + } - workerMaxCount_ -= value; + workerMaxCount_ -= value; - if (idleCount_ < value) { - for (size_t ix = 0; ix < idleCount_; ix++) { - monitor_.notify(); - } - } else { - monitor_.notifyAll(); + if (idleCount_ > value) { + // There are more idle workers than we need to remove, + // so notify enough of them so they can terminate. + for (size_t ix = 0; ix < value; ix++) { + monitor_.notify(); } + } else { + // There are as many or less idle workers than we need to remove, + // so just notify them all so they can terminate. + monitor_.notifyAll(); } - { - Synchronized s(workerMonitor_); + while (workerCount_ != workerMaxCount_) { + workerMonitor_.wait(); + } - while (workerCount_ != workerMaxCount_) { - workerMonitor_.wait(); - } + for (std::set<shared_ptr<Thread> >::iterator ix = deadWorkers_.begin(); + ix != deadWorkers_.end(); + ++ix) { - for (std::set<shared_ptr<Thread> >::iterator ix = deadWorkers_.begin(); - ix != deadWorkers_.end(); - ++ix) { - idMap_.erase((*ix)->getId()); - workers_.erase(*ix); + // when used with a joinable thread factory, we join the threads as we remove them + if (!threadFactory_->isDetached()) { + (*ix)->join(); } - deadWorkers_.clear(); + idMap_.erase((*ix)->getId()); + workers_.erase(*ix); } + + deadWorkers_.clear(); } -bool ThreadManager::Impl::canSleep() { +bool ThreadManager::Impl::canSleep() const { const Thread::id_t id = threadFactory_->getCurrentThreadId(); return idMap_.find(id) == idMap_.end(); } @@ -453,7 +465,11 @@ void ThreadManager::Impl::add(shared_ptr<Runnable> value, int64_t timeout, int64 "not started"); } - removeExpiredTasks(); + // if we're at a limit, remove an expired task to see if the limit clears + if (pendingTaskCountMax_ > 0 && (tasks_.size() >= pendingTaskCountMax_)) { + removeExpired(true); + } + if (pendingTaskCountMax_ > 0 && (tasks_.size() >= pendingTaskCountMax_)) { if (canSleep() && timeout >= 0) { while (pendingTaskCountMax_ > 0 && tasks_.size() >= pendingTaskCountMax_) { @@ -465,7 +481,7 @@ void ThreadManager::Impl::add(shared_ptr<Runnable> value, int64_t timeout, int64 } } - tasks_.push(shared_ptr<ThreadManager::Task>(new ThreadManager::Task(value, expiration))); + tasks_.push_back(shared_ptr<ThreadManager::Task>(new ThreadManager::Task(value, expiration))); // If idle thread is available notify it, otherwise all worker threads are // running and will get around to this task in time. @@ -475,13 +491,21 @@ void ThreadManager::Impl::add(shared_ptr<Runnable> value, int64_t timeout, int64 } void ThreadManager::Impl::remove(shared_ptr<Runnable> task) { - (void)task; - Synchronized s(monitor_); + Guard g(mutex_); if (state_ != ThreadManager::STARTED) { throw IllegalStateException( "ThreadManager::Impl::remove ThreadManager not " "started"); } + + for (TaskQueue::iterator it = tasks_.begin(); it != tasks_.end(); ++it) + { + if ((*it)->getRunnable() == task) + { + tasks_.erase(it); + return; + } + } } boost::shared_ptr<Runnable> ThreadManager::Impl::removeNextPending() { @@ -497,35 +521,40 @@ boost::shared_ptr<Runnable> ThreadManager::Impl::removeNextPending() { } shared_ptr<ThreadManager::Task> task = tasks_.front(); - tasks_.pop(); + tasks_.pop_front(); return task->getRunnable(); } -void ThreadManager::Impl::removeExpiredTasks() { - int64_t now = 0LL; // we won't ask for the time untile we need it +void ThreadManager::Impl::removeExpired(bool justOne) { + // this is always called under a lock + int64_t now = 0LL; - // note that this loop breaks at the first non-expiring task - while (!tasks_.empty()) { - shared_ptr<ThreadManager::Task> task = tasks_.front(); - if (task->getExpireTime() == 0LL) { - break; - } + for (TaskQueue::iterator it = tasks_.begin(); it != tasks_.end(); ) + { if (now == 0LL) { now = Util::currentTime(); } - if (task->getExpireTime() > now) { - break; + + if ((*it)->getExpireTime() > 0LL && (*it)->getExpireTime() < now) { + if (expireCallback_) { + expireCallback_((*it)->getRunnable()); + } + it = tasks_.erase(it); + ++expiredCount_; + if (justOne) { + return; + } } - if (expireCallback_) { - expireCallback_(task->getRunnable()); + else + { + ++it; } - tasks_.pop(); - expiredCount_++; } } void ThreadManager::Impl::setExpireCallback(ExpireCallback expireCallback) { + Guard g(mutex_); expireCallback_ = expireCallback; } @@ -544,7 +573,6 @@ public: private: const size_t workerCount_; const size_t pendingTaskCountMax_; - Monitor monitor_; }; shared_ptr<ThreadManager> ThreadManager::newThreadManager() { diff --git a/lib/cpp/src/thrift/concurrency/ThreadManager.h b/lib/cpp/src/thrift/concurrency/ThreadManager.h index 2112845da..d8bf71b8d 100644 --- a/lib/cpp/src/thrift/concurrency/ThreadManager.h +++ b/lib/cpp/src/thrift/concurrency/ThreadManager.h @@ -71,30 +71,45 @@ public: /** * Stops the thread manager. Aborts all remaining unprocessed task, shuts - * down all created worker threads, and realeases all allocated resources. + * down all created worker threads, and releases all allocated resources. * This method blocks for all worker threads to complete, thus it can * potentially block forever if a worker thread is running a task that * won't terminate. + * + * Worker threads will be joined depending on the threadFactory's detached + * disposition. */ virtual void stop() = 0; - /** - * Joins the thread manager. This is the same as stop, except that it will - * block until all the workers have finished their work. At that point - * the ThreadManager will transition into the STOPPED state. - */ - virtual void join() = 0; - enum STATE { UNINITIALIZED, STARTING, STARTED, JOINING, STOPPING, STOPPED }; virtual STATE state() const = 0; + /** + * \returns the current thread factory + */ virtual boost::shared_ptr<ThreadFactory> threadFactory() const = 0; + /** + * Set the thread factory. + * \throws InvalidArgumentException if the new thread factory has a different + * detached disposition than the one replacing it + */ virtual void threadFactory(boost::shared_ptr<ThreadFactory> value) = 0; + /** + * Adds worker thread(s). + */ virtual void addWorker(size_t value = 1) = 0; + /** + * Removes worker thread(s). + * Threads are joined if the thread factory detached disposition allows it. + * Blocks until the number of worker threads reaches the new limit. + * \param[in] value the number to remove + * \throws InvalidArgumentException if the value is greater than the number + * of workers + */ virtual void removeWorker(size_t value = 1) = 0; /** @@ -123,7 +138,8 @@ public: virtual size_t pendingTaskCountMax() const = 0; /** - * Gets the number of tasks which have been expired without being run. + * Gets the number of tasks which have been expired without being run + * since start() was called. */ virtual size_t expiredTaskCount() = 0; diff --git a/lib/cpp/src/thrift/concurrency/Util.h b/lib/cpp/src/thrift/concurrency/Util.h index ba070b62e..1a915993f 100644 --- a/lib/cpp/src/thrift/concurrency/Util.h +++ b/lib/cpp/src/thrift/concurrency/Util.h @@ -98,7 +98,7 @@ public: * Converts struct timeval to arbitrary-sized ticks since epoch */ static void toTicks(int64_t& result, const struct timeval& value, int64_t ticksPerSec) { - return toTicks(result, value.tv_sec, value.tv_usec, US_PER_S, ticksPerSec); + return toTicks(result, (unsigned long)value.tv_sec, (unsigned long)value.tv_usec, US_PER_S, ticksPerSec); } /** diff --git a/lib/cpp/src/thrift/server/TThreadPoolServer.cpp b/lib/cpp/src/thrift/server/TThreadPoolServer.cpp index b81a522b2..63af85cb9 100644 --- a/lib/cpp/src/thrift/server/TThreadPoolServer.cpp +++ b/lib/cpp/src/thrift/server/TThreadPoolServer.cpp @@ -96,7 +96,7 @@ TThreadPoolServer::~TThreadPoolServer() { void TThreadPoolServer::serve() { TServerFramework::serve(); - threadManager_->join(); + threadManager_->stop(); } int64_t TThreadPoolServer::getTimeout() const { diff --git a/lib/cpp/src/thrift/server/TThreadedServer.cpp b/lib/cpp/src/thrift/server/TThreadedServer.cpp index e15f8f1b0..9de1db89e 100644 --- a/lib/cpp/src/thrift/server/TThreadedServer.cpp +++ b/lib/cpp/src/thrift/server/TThreadedServer.cpp @@ -95,7 +95,6 @@ TThreadedServer::~TThreadedServer() { } void TThreadedServer::serve() { - threadFactory_->setDetached(false); TServerFramework::serve(); // Ensure post-condition of no active clients @@ -126,7 +125,7 @@ void TThreadedServer::onClientConnected(const shared_ptr<TConnectedClient>& pCli void TThreadedServer::onClientDisconnected(TConnectedClient* pClient) { Synchronized sync(clientMonitor_); - drainDeadClients(); // use the outgoing thread to do some maintenance on our dead client backlog + drainDeadClients(); // use the outgoing thread to do some maintenance on our dead client backlog ClientMap::iterator it = activeClientMap_.find(pClient); ClientMap::iterator end = it; deadClientMap_.insert(it, ++end); @@ -153,7 +152,7 @@ void TThreadedServer::TConnectedClientRunner::run() /* override */ { } void TThreadedServer::TConnectedClientRunner::setThread( - const boost::shared_ptr<apache::thrift::concurrency::Thread>& pThread) { + const boost::shared_ptr<apache::thrift::concurrency::Thread>& pThread) { pThread_ = pThread; } diff --git a/lib/cpp/src/thrift/server/TThreadedServer.h b/lib/cpp/src/thrift/server/TThreadedServer.h index 0f2cce9fc..758d1d99e 100644 --- a/lib/cpp/src/thrift/server/TThreadedServer.h +++ b/lib/cpp/src/thrift/server/TThreadedServer.h @@ -34,8 +34,6 @@ namespace server { * Manage clients using threads - threads are created one for each client and are * released when the client disconnects. This server is used to make a dynamically * scalable server up to the concurrent connection limit. - * - * The thread factory will be changed to a non-detached type. */ class TThreadedServer : public TServerFramework { public: @@ -46,7 +44,7 @@ public: const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& protocolFactory, const boost::shared_ptr<apache::thrift::concurrency::ThreadFactory>& threadFactory = boost::shared_ptr<apache::thrift::concurrency::ThreadFactory>( - new apache::thrift::concurrency::PlatformThreadFactory)); + new apache::thrift::concurrency::PlatformThreadFactory(false))); TThreadedServer( const boost::shared_ptr<apache::thrift::TProcessor>& processor, @@ -55,7 +53,7 @@ public: const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& protocolFactory, const boost::shared_ptr<apache::thrift::concurrency::ThreadFactory>& threadFactory = boost::shared_ptr<apache::thrift::concurrency::ThreadFactory>( - new apache::thrift::concurrency::PlatformThreadFactory)); + new apache::thrift::concurrency::PlatformThreadFactory(false))); TThreadedServer( const boost::shared_ptr<apache::thrift::TProcessorFactory>& processorFactory, @@ -66,7 +64,7 @@ public: const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& outputProtocolFactory, const boost::shared_ptr<apache::thrift::concurrency::ThreadFactory>& threadFactory = boost::shared_ptr<apache::thrift::concurrency::ThreadFactory>( - new apache::thrift::concurrency::PlatformThreadFactory)); + new apache::thrift::concurrency::PlatformThreadFactory(false))); TThreadedServer( const boost::shared_ptr<apache::thrift::TProcessor>& processor, @@ -77,7 +75,7 @@ public: const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& outputProtocolFactory, const boost::shared_ptr<apache::thrift::concurrency::ThreadFactory>& threadFactory = boost::shared_ptr<apache::thrift::concurrency::ThreadFactory>( - new apache::thrift::concurrency::PlatformThreadFactory)); + new apache::thrift::concurrency::PlatformThreadFactory(false))); virtual ~TThreadedServer(); diff --git a/lib/cpp/test/TPipeInterruptTest.cpp b/lib/cpp/test/TPipeInterruptTest.cpp index e2f248967..80e4c1fea 100644 --- a/lib/cpp/test/TPipeInterruptTest.cpp +++ b/lib/cpp/test/TPipeInterruptTest.cpp @@ -63,7 +63,7 @@ static void interruptWorker(TPipeServer *pipe) { } BOOST_AUTO_TEST_CASE(stress_pipe_accept_interruption) { - int interruptIters = 100; + int interruptIters = 10; for (int i = 0; i < interruptIters; ++i) { diff --git a/lib/cpp/test/TServerIntegrationTest.cpp b/lib/cpp/test/TServerIntegrationTest.cpp index 21804484b..fd7bae2ff 100644 --- a/lib/cpp/test/TServerIntegrationTest.cpp +++ b/lib/cpp/test/TServerIntegrationTest.cpp @@ -156,9 +156,9 @@ public: boost::shared_ptr<TTransportFactory>(new TTransportFactory), boost::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory))), pEventHandler(boost::shared_ptr<TServerReadyEventHandler>(new TServerReadyEventHandler)), - bStressDone(false), - bStressConnectionCount(0), - bStressRequestCount(0) { + bStressDone(false), + bStressConnectionCount(0), + bStressRequestCount(0) { pServer->setServerEventHandler(pEventHandler); } @@ -170,8 +170,8 @@ public: boost::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory))), pEventHandler(boost::shared_ptr<TServerReadyEventHandler>(new TServerReadyEventHandler)), bStressDone(false), - bStressConnectionCount(0), - bStressRequestCount(0) { + bStressConnectionCount(0), + bStressRequestCount(0) { pServer->setServerEventHandler(pEventHandler); } @@ -217,10 +217,10 @@ public: * \param[in] purpose a description of the test for logging purposes */ void baseline(int64_t numToMake, int64_t expectedHWM, const std::string& purpose) { - BOOST_TEST_MESSAGE(boost::format("Testing %1%: %2% with %3% clients, expect %4% HWM") - % typeid(TServerType).name() % purpose % numToMake % expectedHWM); + BOOST_TEST_MESSAGE(boost::format("Testing %1%: %2% with %3% clients, expect %4% HWM") + % typeid(TServerType).name() % purpose % numToMake % expectedHWM); - startServer(); + startServer(); std::vector<boost::shared_ptr<TSocket> > holdSockets; std::vector<boost::shared_ptr<boost::thread> > holdThreads; @@ -303,14 +303,14 @@ public: * Helper method to stress the system */ void stressor() { - while (!bStressDone) { + while (!bStressDone) { boost::shared_ptr<TSocket> pSocket(new TSocket("localhost", getServerPort()), autoSocketCloser); boost::shared_ptr<TProtocol> pProtocol(new TBinaryProtocol(pSocket)); ParentServiceClient client(pProtocol); pSocket->open(); bStressConnectionCount.fetch_add(1, boost::memory_order_relaxed); for (int i = 0; i < rand() % 1000; ++i) { - client.incrementGeneration(); + client.incrementGeneration(); bStressRequestCount.fetch_add(1, boost::memory_order_relaxed); } } @@ -459,7 +459,7 @@ BOOST_AUTO_TEST_CASE(test_stop_with_interruptable_clients_connected) { BOOST_AUTO_TEST_CASE(test_stop_with_uninterruptable_clients_connected) { // This tests pre-THRIFT-2441 behavior: stopping the server blocks until clients // disconnect. - BOOST_TEST_MESSAGE("Testing stop with uninterruptable clients"); + BOOST_TEST_MESSAGE("Testing stop with uninterruptable clients"); boost::dynamic_pointer_cast<TServerSocket>(pServer->getServerTransport()) ->setInterruptableChildren(false); // returns to pre-THRIFT-2441 behavior diff --git a/lib/cpp/test/concurrency/Tests.cpp b/lib/cpp/test/concurrency/Tests.cpp index 0d81d7eeb..33af39281 100644 --- a/lib/cpp/test/concurrency/Tests.cpp +++ b/lib/cpp/test/concurrency/Tests.cpp @@ -45,25 +45,38 @@ int main(int argc, char** argv) { std::cout << "ThreadFactory tests..." << std::endl; - size_t count = 1000; - size_t floodLoops = 1; - size_t floodCount = 100000; + int reapLoops = 20; + int reapCount = 1000; + size_t floodLoops = 3; + size_t floodCount = 20000; - std::cout << "\t\tThreadFactory reap N threads test: N = " << count << std::endl; + std::cout << "\t\tThreadFactory reap N threads test: N = " << reapLoops << "x" << reapCount << std::endl; - assert(threadFactoryTests.reapNThreads(count)); + if (!threadFactoryTests.reapNThreads(reapLoops, reapCount)) { + std::cerr << "\t\ttThreadFactory reap N threads FAILED" << std::endl; + return 1; + } - std::cout << "\t\tThreadFactory floodN threads test: N = " << floodCount << std::endl; + std::cout << "\t\tThreadFactory flood N threads test: N = " << floodLoops << "x" << floodCount << std::endl; - assert(threadFactoryTests.floodNTest(floodLoops, floodCount)); + if (!threadFactoryTests.floodNTest(floodLoops, floodCount)) { + std::cerr << "\t\ttThreadFactory flood N threads FAILED" << std::endl; + return 1; + } std::cout << "\t\tThreadFactory synchronous start test" << std::endl; - assert(threadFactoryTests.synchStartTest()); + if (!threadFactoryTests.synchStartTest()) { + std::cerr << "\t\ttThreadFactory synchronous start FAILED" << std::endl; + return 1; + } std::cout << "\t\tThreadFactory monitor timeout test" << std::endl; - assert(threadFactoryTests.monitorTimeoutTest()); + if (!threadFactoryTests.monitorTimeoutTest()) { + std::cerr << "\t\ttThreadFactory monitor timeout FAILED" << std::endl; + return 1; + } } if (runAll || args[0].compare("util") == 0) { @@ -97,7 +110,10 @@ int main(int argc, char** argv) { TimerManagerTests timerManagerTests; - assert(timerManagerTests.test00()); + if (!timerManagerTests.test00()) { + std::cerr << "\t\tTimerManager tests FAILED" << std::endl; + return 1; + } } if (runAll || args[0].compare("thread-manager") == 0) { @@ -105,24 +121,34 @@ int main(int argc, char** argv) { std::cout << "ThreadManager tests..." << std::endl; { - size_t workerCount = 100; + size_t taskCount = 50000; + int64_t delay = 10LL; - size_t taskCount = 100000; + ThreadManagerTests threadManagerTests; - int64_t delay = 10LL; + std::cout << "\t\tThreadManager api test:" << std::endl; + + if (!threadManagerTests.apiTest()) { + std::cerr << "\t\tThreadManager apiTest FAILED" << std::endl; + return 1; + } std::cout << "\t\tThreadManager load test: worker count: " << workerCount << " task count: " << taskCount << " delay: " << delay << std::endl; - ThreadManagerTests threadManagerTests; - - assert(threadManagerTests.loadTest(taskCount, delay, workerCount)); + if (!threadManagerTests.loadTest(taskCount, delay, workerCount)) { + std::cerr << "\t\tThreadManager loadTest FAILED" << std::endl; + return 1; + } std::cout << "\t\tThreadManager block test: worker count: " << workerCount << " delay: " << delay << std::endl; - assert(threadManagerTests.blockTest(delay, workerCount)); + if (!threadManagerTests.blockTest(delay, workerCount)) { + std::cerr << "\t\tThreadManager blockTest FAILED" << std::endl; + return 1; + } } } @@ -134,13 +160,13 @@ int main(int argc, char** argv) { size_t minWorkerCount = 2; - size_t maxWorkerCount = 512; + size_t maxWorkerCount = 64; size_t tasksPerWorker = 1000; - int64_t delay = 10LL; + int64_t delay = 5LL; - for (size_t workerCount = minWorkerCount; workerCount < maxWorkerCount; workerCount *= 2) { + for (size_t workerCount = minWorkerCount; workerCount < maxWorkerCount; workerCount *= 4) { size_t taskCount = workerCount * tasksPerWorker; @@ -149,8 +175,15 @@ int main(int argc, char** argv) { ThreadManagerTests threadManagerTests; - threadManagerTests.loadTest(taskCount, delay, workerCount); + if (!threadManagerTests.loadTest(taskCount, delay, workerCount)) + { + std::cerr << "\t\tThreadManager loadTest FAILED" << std::endl; + return 1; + } } } } + + std::cout << "ALL TESTS PASSED" << std::endl; + return 0; } diff --git a/lib/cpp/test/concurrency/ThreadFactoryTests.h b/lib/cpp/test/concurrency/ThreadFactoryTests.h index 3ad14cac3..4fc688cc5 100644 --- a/lib/cpp/test/concurrency/ThreadFactoryTests.h +++ b/lib/cpp/test/concurrency/ThreadFactoryTests.h @@ -43,36 +43,6 @@ using namespace apache::thrift::concurrency; class ThreadFactoryTests { public: - static const double TEST_TOLERANCE; - - class Task : public Runnable { - - public: - Task() {} - - void run() { std::cout << "\t\t\tHello World" << std::endl; } - }; - - /** - * Hello world test - */ - bool helloWorldTest() { - - PlatformThreadFactory threadFactory = PlatformThreadFactory(); - - shared_ptr<Task> task = shared_ptr<Task>(new ThreadFactoryTests::Task()); - - shared_ptr<Thread> thread = threadFactory.newThread(task); - - thread->start(); - - thread->join(); - - std::cout << "\t\t\tSuccess!" << std::endl; - - return true; - } - /** * Reap N threads */ @@ -244,15 +214,22 @@ public: return true; } - /** See how accurate monitor timeout is. */ + /** + * The only guarantee a monitor timeout can give you is that + * it will take "at least" as long as the timeout, no less. + * There is absolutely no guarantee around regaining execution + * near the timeout. On a busy system (like inside a third party + * CI environment) it could take quite a bit longer than the + * requested timeout, and that's ok. + */ - bool monitorTimeoutTest(size_t count = 1000, int64_t timeout = 10) { + bool monitorTimeoutTest(int64_t count = 1000, int64_t timeout = 2) { Monitor monitor; int64_t startTime = Util::currentTime(); - for (size_t ix = 0; ix < count; ix++) { + for (int64_t ix = 0; ix < count; ix++) { { Synchronized s(monitor); try { @@ -264,18 +241,11 @@ public: int64_t endTime = Util::currentTime(); - double error = ((endTime - startTime) - (count * timeout)) / (double)(count * timeout); - - if (error < 0.0) { - - error *= 1.0; - } - - bool success = error < ThreadFactoryTests::TEST_TOLERANCE; + bool success = (endTime - startTime) >= (count * timeout); std::cout << "\t\t\t" << (success ? "Success" : "Failure") - << "! expected time: " << count * timeout - << "ms elapsed time: " << endTime - startTime << "ms error%: " << error * 100.0 + << ": minimum required time to elapse " << count * timeout + << "ms; actual elapsed time " << endTime - startTime << "ms" << std::endl; return success; @@ -285,17 +255,15 @@ public: public: FloodTask(const size_t id) : _id(id) {} ~FloodTask() { - if (_id % 1000 == 0) { + if (_id % 10000 == 0) { std::cout << "\t\tthread " << _id << " done" << std::endl; } } void run() { - if (_id % 1000 == 0) { + if (_id % 10000 == 0) { std::cout << "\t\tthread " << _id << " started" << std::endl; } - - THRIFT_SLEEP_USEC(1); } const size_t _id; }; @@ -321,8 +289,6 @@ public: thread->start(); - THRIFT_SLEEP_USEC(1); - } catch (TException& e) { std::cout << "\t\t\tfailed to start " << lix* count + tix << " thread " << e.what() @@ -341,7 +307,6 @@ public: } }; -const double ThreadFactoryTests::TEST_TOLERANCE = .20; } } } diff --git a/lib/cpp/test/concurrency/ThreadManagerTests.h b/lib/cpp/test/concurrency/ThreadManagerTests.h index b1968135d..b5925ac2f 100644 --- a/lib/cpp/test/concurrency/ThreadManagerTests.h +++ b/lib/cpp/test/concurrency/ThreadManagerTests.h @@ -24,9 +24,9 @@ #include <thrift/concurrency/Util.h> #include <assert.h> +#include <deque> #include <set> #include <iostream> -#include <set> #include <stdint.h> namespace apache { @@ -36,9 +36,26 @@ namespace test { using namespace apache::thrift::concurrency; -class ThreadManagerTests { +static std::deque<boost::shared_ptr<Runnable> > m_expired; +static void expiredNotifier(boost::shared_ptr<Runnable> runnable) +{ + m_expired.push_back(runnable); +} - static const double TEST_TOLERANCE; +static void sleep_(int64_t millisec) { + Monitor _sleep; + Synchronized s(_sleep); + + try { + _sleep.wait(millisec); + } catch (TimedOutException&) { + ; + } catch (...) { + assert(0); + } +} + +class ThreadManagerTests { public: class Task : public Runnable { @@ -51,17 +68,7 @@ public: _startTime = Util::currentTime(); - { - Synchronized s(_sleep); - - try { - _sleep.wait(_timeout); - } catch (TimedOutException&) { - ; - } catch (...) { - assert(0); - } - } + sleep_(_timeout); _endTime = Util::currentTime(); @@ -73,9 +80,7 @@ public: // std::cout << "Thread " << _count << " completed " << std::endl; _count--; - - if (_count == 0) { - + if (_count % 10000 == 0) { _monitor.notify(); } } @@ -130,11 +135,13 @@ public: threadManager->add(*ix); } + std::cout << "\t\t\t\tloaded " << count << " tasks to execute" << std::endl; + { Synchronized s(monitor); while (activeCount > 0) { - + std::cout << "\t\t\t\tactiveCount = " << activeCount << std::endl; monitor.wait(); } } @@ -179,23 +186,15 @@ public: averageTime /= count; - std::cout << "\t\t\tfirst start: " << firstTime << "ms Last end: " << lastTime - << "ms min: " << minTime << "ms max: " << maxTime << "ms average: " << averageTime + std::cout << "\t\t\tfirst start: " << firstTime << " Last end: " << lastTime + << " min: " << minTime << "ms max: " << maxTime << "ms average: " << averageTime << "ms" << std::endl; - double expectedTime = (double(count + (workerCount - 1)) / workerCount) * timeout; - - double error = ((time01 - time00) - expectedTime) / expectedTime; - - if (error < 0) { - error *= -1.0; - } - - bool success = error < TEST_TOLERANCE; + bool success = (time01 - time00) >= ((int64_t)count * timeout) / (int64_t)workerCount; std::cout << "\t\t\t" << (success ? "Success" : "Failure") - << "! expected time: " << expectedTime << "ms elapsed time: " << time01 - time00 - << "ms error%: " << error * 100.0 << std::endl; + << "! expected time: " << ((int64_t)count * timeout) / (int64_t)workerCount << "ms elapsed time: " << time01 - time00 + << "ms" << std::endl; return success; } @@ -203,30 +202,36 @@ public: class BlockTask : public Runnable { public: - BlockTask(Monitor& monitor, Monitor& bmonitor, size_t& count) - : _monitor(monitor), _bmonitor(bmonitor), _count(count) {} + BlockTask(Monitor& entryMonitor, Monitor& blockMonitor, bool& blocked, Monitor& doneMonitor, size_t& count) + : _entryMonitor(entryMonitor), _entered(false), _blockMonitor(blockMonitor), _blocked(blocked), _doneMonitor(doneMonitor), _count(count) {} void run() { { - Synchronized s(_bmonitor); - - _bmonitor.wait(); + Synchronized s(_entryMonitor); + _entered = true; + _entryMonitor.notify(); } { - Synchronized s(_monitor); - - _count--; - - if (_count == 0) { + Synchronized s(_blockMonitor); + while (_blocked) { + _blockMonitor.wait(); + } + } - _monitor.notify(); + { + Synchronized s(_doneMonitor); + if (--_count == 0) { + _doneMonitor.notify(); } } } - Monitor& _monitor; - Monitor& _bmonitor; + Monitor& _entryMonitor; + bool _entered; + Monitor& _blockMonitor; + bool& _blocked; + Monitor& _doneMonitor; size_t& _count; }; @@ -240,8 +245,10 @@ public: try { - Monitor bmonitor; - Monitor monitor; + Monitor entryMonitor; // not used by this test + Monitor blockMonitor; + bool blocked[] = {true, true, true}; + Monitor doneMonitor; size_t pendingTaskMaxCount = workerCount; @@ -260,21 +267,22 @@ public: threadManager->start(); - std::set<shared_ptr<ThreadManagerTests::BlockTask> > tasks; + std::vector<shared_ptr<ThreadManagerTests::BlockTask> > tasks; + tasks.reserve(workerCount + pendingTaskMaxCount); for (size_t ix = 0; ix < workerCount; ix++) { - tasks.insert(shared_ptr<ThreadManagerTests::BlockTask>( - new ThreadManagerTests::BlockTask(monitor, bmonitor, activeCounts[0]))); + tasks.push_back(shared_ptr<ThreadManagerTests::BlockTask>( + new ThreadManagerTests::BlockTask(entryMonitor, blockMonitor, blocked[0], doneMonitor, activeCounts[0]))); } for (size_t ix = 0; ix < pendingTaskMaxCount; ix++) { - tasks.insert(shared_ptr<ThreadManagerTests::BlockTask>( - new ThreadManagerTests::BlockTask(monitor, bmonitor, activeCounts[1]))); + tasks.push_back(shared_ptr<ThreadManagerTests::BlockTask>( + new ThreadManagerTests::BlockTask(entryMonitor, blockMonitor, blocked[1], doneMonitor, activeCounts[1]))); } - for (std::set<shared_ptr<ThreadManagerTests::BlockTask> >::iterator ix = tasks.begin(); + for (std::vector<shared_ptr<ThreadManagerTests::BlockTask> >::iterator ix = tasks.begin(); ix != tasks.end(); ix++) { threadManager->add(*ix); @@ -285,7 +293,7 @@ public: } shared_ptr<ThreadManagerTests::BlockTask> extraTask( - new ThreadManagerTests::BlockTask(monitor, bmonitor, activeCounts[2])); + new ThreadManagerTests::BlockTask(entryMonitor, blockMonitor, blocked[2], doneMonitor, activeCounts[2])); try { threadManager->add(extraTask, 1); @@ -309,16 +317,15 @@ public: << "Pending tasks " << threadManager->pendingTaskCount() << std::endl; { - Synchronized s(bmonitor); - - bmonitor.notifyAll(); + Synchronized s(blockMonitor); + blocked[0] = false; + blockMonitor.notifyAll(); } { - Synchronized s(monitor); - + Synchronized s(doneMonitor); while (activeCounts[0] != 0) { - monitor.wait(); + doneMonitor.wait(); } } @@ -341,37 +348,37 @@ public: // Wake up tasks that were pending before and wait for them to complete { - Synchronized s(bmonitor); - - bmonitor.notifyAll(); + Synchronized s(blockMonitor); + blocked[1] = false; + blockMonitor.notifyAll(); } { - Synchronized s(monitor); - + Synchronized s(doneMonitor); while (activeCounts[1] != 0) { - monitor.wait(); + doneMonitor.wait(); } } // Wake up the extra task and wait for it to complete { - Synchronized s(bmonitor); - - bmonitor.notifyAll(); + Synchronized s(blockMonitor); + blocked[2] = false; + blockMonitor.notifyAll(); } { - Synchronized s(monitor); - + Synchronized s(doneMonitor); while (activeCounts[2] != 0) { - monitor.wait(); + doneMonitor.wait(); } } + threadManager->stop(); + if (!(success = (threadManager->totalTaskCount() == 0))) { - throw TException("Unexpected pending task count"); + throw TException("Unexpected total task count"); } } catch (TException& e) { @@ -381,9 +388,295 @@ public: std::cout << "\t\t\t" << (success ? "Success" : "Failure") << std::endl; return success; } + + + bool apiTest() { + + // prove currentTime has milliseconds granularity since many other things depend on it + int64_t a = Util::currentTime(); + sleep_(100); + int64_t b = Util::currentTime(); + if (b - a < 50 || b - a > 150) { + std::cerr << "\t\t\texpected 100ms gap, found " << (b-a) << "ms gap instead." << std::endl; + return false; + } + +#if !USE_BOOST_THREAD && !USE_STD_THREAD + // test once with a detached thread factory and once with a joinable thread factory + + shared_ptr<PosixThreadFactory> threadFactory + = shared_ptr<PosixThreadFactory>(new PosixThreadFactory(false)); + + std::cout << "\t\t\tapiTest with joinable thread factory" << std::endl; + if (!apiTestWithThreadFactory(threadFactory)) { + return false; + } + + threadFactory.reset(new PosixThreadFactory(true)); + std::cout << "\t\t\tapiTest with detached thread factory" << std::endl; + return apiTestWithThreadFactory(threadFactory); +#else + return apiTestWithThreadFactory(shared_ptr<PlatformThreadFactory>(new PlatformThreadFactory())); +#endif + + } + + bool apiTestWithThreadFactory(shared_ptr<PlatformThreadFactory> threadFactory) + { + shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(1); + threadManager->threadFactory(threadFactory); + +#if !USE_BOOST_THREAD && !USE_STD_THREAD + threadFactory->setPriority(PosixThreadFactory::HIGHEST); + + // verify we cannot change the thread factory to one with the opposite detached setting + shared_ptr<PlatformThreadFactory> threadFactory2 + = shared_ptr<PosixThreadFactory>(new PlatformThreadFactory( + PosixThreadFactory::ROUND_ROBIN, + PosixThreadFactory::NORMAL, + 1, + !threadFactory->isDetached())); + try { + threadManager->threadFactory(threadFactory2); + // if the call succeeded we changed the thread factory to one that had the opposite setting for "isDetached()". + // this is bad, because the thread manager checks with the thread factory to see if it should join threads + // as they are leaving - so the detached status of new threads cannot change while there are existing threads. + std::cerr << "\t\t\tShould not be able to change thread factory detached disposition" << std::endl; + return false; + } + catch (InvalidArgumentException& ex) { + /* expected */ + } +#endif + + std::cout << "\t\t\t\tstarting.. " << std::endl; + + threadManager->start(); + threadManager->setExpireCallback(expiredNotifier); // apache::thrift::stdcxx::bind(&ThreadManagerTests::expiredNotifier, this)); + +#define EXPECT(FUNC, COUNT) { size_t c = FUNC; if (c != COUNT) { std::cerr << "expected " #FUNC" to be " #COUNT ", but was " << c << std::endl; return false; } } + + EXPECT(threadManager->workerCount(), 1); + EXPECT(threadManager->idleWorkerCount(), 1); + EXPECT(threadManager->pendingTaskCount(), 0); + + std::cout << "\t\t\t\tadd 2nd worker.. " << std::endl; + + threadManager->addWorker(); + + EXPECT(threadManager->workerCount(), 2); + EXPECT(threadManager->idleWorkerCount(), 2); + EXPECT(threadManager->pendingTaskCount(), 0); + + std::cout << "\t\t\t\tremove 2nd worker.. " << std::endl; + + threadManager->removeWorker(); + + EXPECT(threadManager->workerCount(), 1); + EXPECT(threadManager->idleWorkerCount(), 1); + EXPECT(threadManager->pendingTaskCount(), 0); + + std::cout << "\t\t\t\tremove 1st worker.. " << std::endl; + + threadManager->removeWorker(); + + EXPECT(threadManager->workerCount(), 0); + EXPECT(threadManager->idleWorkerCount(), 0); + EXPECT(threadManager->pendingTaskCount(), 0); + + std::cout << "\t\t\t\tadd blocking task.. " << std::endl; + + // We're going to throw a blocking task into the mix + Monitor entryMonitor; // signaled when task is running + Monitor blockMonitor; // to be signaled to unblock the task + bool blocked(true); // set to false before notifying + Monitor doneMonitor; // signaled when count reaches zero + size_t activeCount = 1; + shared_ptr<ThreadManagerTests::BlockTask> blockingTask( + new ThreadManagerTests::BlockTask(entryMonitor, blockMonitor, blocked, doneMonitor, activeCount)); + threadManager->add(blockingTask); + + EXPECT(threadManager->workerCount(), 0); + EXPECT(threadManager->idleWorkerCount(), 0); + EXPECT(threadManager->pendingTaskCount(), 1); + + std::cout << "\t\t\t\tadd other task.. " << std::endl; + + shared_ptr<ThreadManagerTests::Task> otherTask( + new ThreadManagerTests::Task(doneMonitor, activeCount, 0)); + + threadManager->add(otherTask); + + EXPECT(threadManager->workerCount(), 0); + EXPECT(threadManager->idleWorkerCount(), 0); + EXPECT(threadManager->pendingTaskCount(), 2); + + std::cout << "\t\t\t\tremove blocking task specifically.. " << std::endl; + + threadManager->remove(blockingTask); + + EXPECT(threadManager->workerCount(), 0); + EXPECT(threadManager->idleWorkerCount(), 0); + EXPECT(threadManager->pendingTaskCount(), 1); + + std::cout << "\t\t\t\tremove next pending task.." << std::endl; + + shared_ptr<Runnable> nextTask = threadManager->removeNextPending(); + if (nextTask != otherTask) { + std::cerr << "\t\t\t\t\texpected removeNextPending to return otherTask" << std::endl; + return false; + } + + EXPECT(threadManager->workerCount(), 0); + EXPECT(threadManager->idleWorkerCount(), 0); + EXPECT(threadManager->pendingTaskCount(), 0); + + std::cout << "\t\t\t\tremove next pending task (none left).." << std::endl; + + nextTask = threadManager->removeNextPending(); + if (nextTask) { + std::cerr << "\t\t\t\t\texpected removeNextPending to return an empty Runnable" << std::endl; + return false; + } + + std::cout << "\t\t\t\tadd 2 expired tasks and 1 not.." << std::endl; + + shared_ptr<ThreadManagerTests::Task> expiredTask( + new ThreadManagerTests::Task(doneMonitor, activeCount, 0)); + + threadManager->add(expiredTask, 0, 1); + threadManager->add(blockingTask); // add one that hasn't expired to make sure it gets skipped + threadManager->add(expiredTask, 0, 1); // add a second expired to ensure removeExpiredTasks removes both + + sleep_(50); // make sure enough time elapses for it to expire - the shortest expiration time is 1 millisecond + + EXPECT(threadManager->workerCount(), 0); + EXPECT(threadManager->idleWorkerCount(), 0); + EXPECT(threadManager->pendingTaskCount(), 3); + EXPECT(threadManager->expiredTaskCount(), 0); + + std::cout << "\t\t\t\tremove expired tasks.." << std::endl; + + if (!m_expired.empty()) { + std::cerr << "\t\t\t\t\texpected m_expired to be empty" << std::endl; + return false; + } + + threadManager->removeExpiredTasks(); + + if (m_expired.size() != 2) { + std::cerr << "\t\t\t\t\texpected m_expired to be set" << std::endl; + return false; + } + + if (m_expired.front() != expiredTask) { + std::cerr << "\t\t\t\t\texpected m_expired[0] to be the expired task" << std::endl; + return false; + } + m_expired.pop_front(); + + if (m_expired.front() != expiredTask) { + std::cerr << "\t\t\t\t\texpected m_expired[1] to be the expired task" << std::endl; + return false; + } + + m_expired.clear(); + + threadManager->remove(blockingTask); + + EXPECT(threadManager->workerCount(), 0); + EXPECT(threadManager->idleWorkerCount(), 0); + EXPECT(threadManager->pendingTaskCount(), 0); + EXPECT(threadManager->expiredTaskCount(), 2); + + std::cout << "\t\t\t\tadd expired task (again).." << std::endl; + + threadManager->add(expiredTask, 0, 1); // expires in 1ms + sleep_(50); // make sure enough time elapses for it to expire - the shortest expiration time is 1ms + + std::cout << "\t\t\t\tadd worker to consume expired task.." << std::endl; + + threadManager->addWorker(); + sleep_(100); // make sure it has time to spin up and expire the task + + if (m_expired.empty()) { + std::cerr << "\t\t\t\t\texpected m_expired to be set" << std::endl; + return false; + } + + if (m_expired.front() != expiredTask) { + std::cerr << "\t\t\t\t\texpected m_expired to be the expired task" << std::endl; + return false; + } + + m_expired.clear(); + + EXPECT(threadManager->workerCount(), 1); + EXPECT(threadManager->idleWorkerCount(), 1); + EXPECT(threadManager->pendingTaskCount(), 0); + EXPECT(threadManager->expiredTaskCount(), 3); + + std::cout << "\t\t\t\ttry to remove too many workers" << std::endl; + try { + threadManager->removeWorker(2); + std::cerr << "\t\t\t\t\texpected InvalidArgumentException" << std::endl; + return false; + } catch (const InvalidArgumentException&) { + /* expected */ + } + + std::cout << "\t\t\t\tremove worker.. " << std::endl; + + threadManager->removeWorker(); + + EXPECT(threadManager->workerCount(), 0); + EXPECT(threadManager->idleWorkerCount(), 0); + EXPECT(threadManager->pendingTaskCount(), 0); + EXPECT(threadManager->expiredTaskCount(), 3); + + std::cout << "\t\t\t\tadd blocking task.. " << std::endl; + + threadManager->add(blockingTask); + + EXPECT(threadManager->workerCount(), 0); + EXPECT(threadManager->idleWorkerCount(), 0); + EXPECT(threadManager->pendingTaskCount(), 1); + + std::cout << "\t\t\t\tadd worker.. " << std::endl; + + threadManager->addWorker(); + { + Synchronized s(entryMonitor); + while (!blockingTask->_entered) { + entryMonitor.wait(); + } + } + + EXPECT(threadManager->workerCount(), 1); + EXPECT(threadManager->idleWorkerCount(), 0); + EXPECT(threadManager->pendingTaskCount(), 0); + + std::cout << "\t\t\t\tunblock task and remove worker.. " << std::endl; + + { + Synchronized s(blockMonitor); + blocked = false; + blockMonitor.notifyAll(); + } + threadManager->removeWorker(); + + EXPECT(threadManager->workerCount(), 0); + EXPECT(threadManager->idleWorkerCount(), 0); + EXPECT(threadManager->pendingTaskCount(), 0); + + std::cout << "\t\t\t\tcleanup.. " << std::endl; + + blockingTask.reset(); + threadManager.reset(); + return true; + } }; -const double ThreadManagerTests::TEST_TOLERANCE = .20; } } } diff --git a/lib/cpp/test/concurrency/TimerManagerTests.h b/lib/cpp/test/concurrency/TimerManagerTests.h index c6fa4cfa1..32d39355f 100644 --- a/lib/cpp/test/concurrency/TimerManagerTests.h +++ b/lib/cpp/test/concurrency/TimerManagerTests.h @@ -34,8 +34,6 @@ using namespace apache::thrift::concurrency; class TimerManagerTests { - static const double TEST_TOLERANCE; - public: class Task : public Runnable { public: @@ -52,25 +50,11 @@ public: void run() { _endTime = Util::currentTime(); - - // Figure out error percentage - - int64_t delta = _endTime - _startTime; - - delta = delta > _timeout ? delta - _timeout : _timeout - delta; - - double error = double(delta) / _timeout; - - if (error < TEST_TOLERANCE) { - _success = true; - } - - _done = true; - - std::cout << "\t\t\tTimerManagerTests::Task[" << this << "] done" << std::endl; // debug + _success = (_endTime - _startTime) >= _timeout; { Synchronized s(_monitor); + _done = true; _monitor.notifyAll(); } } @@ -147,7 +131,6 @@ public: Monitor _monitor; }; -const double TimerManagerTests::TEST_TOLERANCE = .20; } } } diff --git a/lib/cpp/test/processor/ProcessorTest.cpp b/lib/cpp/test/processor/ProcessorTest.cpp index 006665774..a4e984cc7 100644 --- a/lib/cpp/test/processor/ProcessorTest.cpp +++ b/lib/cpp/test/processor/ProcessorTest.cpp @@ -299,7 +299,7 @@ void checkNoEvents(const boost::shared_ptr<EventLog>& log) { */ uint32_t checkNewConnEvents(const boost::shared_ptr<EventLog>& log) { // Check for an ET_CONN_CREATED event - Event event = log->waitForEvent(); + Event event = log->waitForEvent(2500); BOOST_CHECK_EQUAL(EventLog::ET_CONN_CREATED, event.type); // Some servers call the processContext() hook immediately. diff --git a/test/cpp/src/TestServer.cpp b/test/cpp/src/TestServer.cpp index 4808d89f4..b4378608b 100644 --- a/test/cpp/src/TestServer.cpp +++ b/test/cpp/src/TestServer.cpp @@ -746,7 +746,7 @@ int main(int argc, char** argv) { if (server.get() != NULL) { if (protocol_type == "header") { - // Tell the server to use the same protocol for input / output + // Tell the server to use the same protocol for input / output // if using header server->setOutputProtocolFactory(boost::shared_ptr<TProtocolFactory>()); } |