diff options
Diffstat (limited to 'lib/cpp/src/concurrency/test/ThreadManagerTests.h')
-rw-r--r-- | lib/cpp/src/concurrency/test/ThreadManagerTests.h | 366 |
1 files changed, 366 insertions, 0 deletions
diff --git a/lib/cpp/src/concurrency/test/ThreadManagerTests.h b/lib/cpp/src/concurrency/test/ThreadManagerTests.h new file mode 100644 index 000000000..e7b517431 --- /dev/null +++ b/lib/cpp/src/concurrency/test/ThreadManagerTests.h @@ -0,0 +1,366 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <config.h> +#include <concurrency/ThreadManager.h> +#include <concurrency/PosixThreadFactory.h> +#include <concurrency/Monitor.h> +#include <concurrency/Util.h> + +#include <assert.h> +#include <set> +#include <iostream> +#include <set> +#include <stdint.h> + +namespace apache { namespace thrift { namespace concurrency { namespace test { + +using namespace apache::thrift::concurrency; + +/** + * ThreadManagerTests class + * + * @version $Id:$ + */ +class ThreadManagerTests { + +public: + + static const double ERROR; + + class Task: public Runnable { + + public: + + Task(Monitor& monitor, size_t& count, int64_t timeout) : + _monitor(monitor), + _count(count), + _timeout(timeout), + _done(false) {} + + void run() { + + _startTime = Util::currentTime(); + + { + Synchronized s(_sleep); + + try { + _sleep.wait(_timeout); + } catch(TimedOutException& e) { + ; + }catch(...) { + assert(0); + } + } + + _endTime = Util::currentTime(); + + _done = true; + + { + Synchronized s(_monitor); + + // std::cout << "Thread " << _count << " completed " << std::endl; + + _count--; + + if (_count == 0) { + + _monitor.notify(); + } + } + } + + Monitor& _monitor; + size_t& _count; + int64_t _timeout; + int64_t _startTime; + int64_t _endTime; + bool _done; + Monitor _sleep; + }; + + /** + * Dispatch count tasks, each of which blocks for timeout milliseconds then + * completes. Verify that all tasks completed and that thread manager cleans + * up properly on delete. + */ + bool loadTest(size_t count=100, int64_t timeout=100LL, size_t workerCount=4) { + + Monitor monitor; + + size_t activeCount = count; + + shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workerCount); + + shared_ptr<PosixThreadFactory> threadFactory = shared_ptr<PosixThreadFactory>(new PosixThreadFactory()); + + threadFactory->setPriority(PosixThreadFactory::HIGHEST); + + threadManager->threadFactory(threadFactory); + + threadManager->start(); + + std::set<shared_ptr<ThreadManagerTests::Task> > tasks; + + for (size_t ix = 0; ix < count; ix++) { + + tasks.insert(shared_ptr<ThreadManagerTests::Task>(new ThreadManagerTests::Task(monitor, activeCount, timeout))); + } + + int64_t time00 = Util::currentTime(); + + for (std::set<shared_ptr<ThreadManagerTests::Task> >::iterator ix = tasks.begin(); ix != tasks.end(); ix++) { + + threadManager->add(*ix); + } + + { + Synchronized s(monitor); + + while(activeCount > 0) { + + monitor.wait(); + } + } + + int64_t time01 = Util::currentTime(); + + int64_t firstTime = 9223372036854775807LL; + int64_t lastTime = 0; + + double averageTime = 0; + int64_t minTime = 9223372036854775807LL; + int64_t maxTime = 0; + + for (std::set<shared_ptr<ThreadManagerTests::Task> >::iterator ix = tasks.begin(); ix != tasks.end(); ix++) { + + shared_ptr<ThreadManagerTests::Task> task = *ix; + + int64_t delta = task->_endTime - task->_startTime; + + assert(delta > 0); + + if (task->_startTime < firstTime) { + firstTime = task->_startTime; + } + + if (task->_endTime > lastTime) { + lastTime = task->_endTime; + } + + if (delta < minTime) { + minTime = delta; + } + + if (delta > maxTime) { + maxTime = delta; + } + + averageTime+= delta; + } + + averageTime /= count; + + std::cout << "\t\t\tfirst start: " << firstTime << "ms Last end: " << lastTime << "ms min: " << minTime << "ms max: " << maxTime << "ms average: " << averageTime << "ms" << std::endl; + + double expectedTime = ((count + (workerCount - 1)) / workerCount) * timeout; + + double error = ((time01 - time00) - expectedTime) / expectedTime; + + if (error < 0) { + error*= -1.0; + } + + bool success = error < ERROR; + + std::cout << "\t\t\t" << (success ? "Success" : "Failure") << "! expected time: " << expectedTime << "ms elapsed time: "<< time01 - time00 << "ms error%: " << error * 100.0 << std::endl; + + return success; + } + + class BlockTask: public Runnable { + + public: + + BlockTask(Monitor& monitor, Monitor& bmonitor, size_t& count) : + _monitor(monitor), + _bmonitor(bmonitor), + _count(count) {} + + void run() { + { + Synchronized s(_bmonitor); + + _bmonitor.wait(); + + } + + { + Synchronized s(_monitor); + + _count--; + + if (_count == 0) { + + _monitor.notify(); + } + } + } + + Monitor& _monitor; + Monitor& _bmonitor; + size_t& _count; + }; + + /** + * Block test. Create pendingTaskCountMax tasks. Verify that we block adding the + * pendingTaskCountMax + 1th task. Verify that we unblock when a task completes */ + + bool blockTest(int64_t timeout=100LL, size_t workerCount=2) { + + bool success = false; + + try { + + Monitor bmonitor; + Monitor monitor; + + size_t pendingTaskMaxCount = workerCount; + + size_t activeCounts[] = {workerCount, pendingTaskMaxCount, 1}; + + shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workerCount, pendingTaskMaxCount); + + shared_ptr<PosixThreadFactory> threadFactory = shared_ptr<PosixThreadFactory>(new PosixThreadFactory()); + + threadFactory->setPriority(PosixThreadFactory::HIGHEST); + + threadManager->threadFactory(threadFactory); + + threadManager->start(); + + std::set<shared_ptr<ThreadManagerTests::BlockTask> > tasks; + + for (size_t ix = 0; ix < workerCount; ix++) { + + tasks.insert(shared_ptr<ThreadManagerTests::BlockTask>(new ThreadManagerTests::BlockTask(monitor, bmonitor,activeCounts[0]))); + } + + for (size_t ix = 0; ix < pendingTaskMaxCount; ix++) { + + tasks.insert(shared_ptr<ThreadManagerTests::BlockTask>(new ThreadManagerTests::BlockTask(monitor, bmonitor,activeCounts[1]))); + } + + for (std::set<shared_ptr<ThreadManagerTests::BlockTask> >::iterator ix = tasks.begin(); ix != tasks.end(); ix++) { + threadManager->add(*ix); + } + + if(!(success = (threadManager->totalTaskCount() == pendingTaskMaxCount + workerCount))) { + throw TException("Unexpected pending task count"); + } + + shared_ptr<ThreadManagerTests::BlockTask> extraTask(new ThreadManagerTests::BlockTask(monitor, bmonitor, activeCounts[2])); + + try { + threadManager->add(extraTask, 1); + throw TException("Unexpected success adding task in excess of pending task count"); + } catch(TimedOutException& e) { + } + + std::cout << "\t\t\t" << "Pending tasks " << threadManager->pendingTaskCount() << std::endl; + + { + Synchronized s(bmonitor); + + bmonitor.notifyAll(); + } + + { + Synchronized s(monitor); + + while(activeCounts[0] != 0) { + monitor.wait(); + } + } + + std::cout << "\t\t\t" << "Pending tasks " << threadManager->pendingTaskCount() << std::endl; + + try { + threadManager->add(extraTask, 1); + } catch(TimedOutException& e) { + std::cout << "\t\t\t" << "add timed out unexpectedly" << std::endl; + throw TException("Unexpected timeout adding task"); + + } catch(TooManyPendingTasksException& e) { + std::cout << "\t\t\t" << "add encountered too many pending exepctions" << std::endl; + throw TException("Unexpected timeout adding task"); + } + + // Wake up tasks that were pending before and wait for them to complete + + { + Synchronized s(bmonitor); + + bmonitor.notifyAll(); + } + + { + Synchronized s(monitor); + + while(activeCounts[1] != 0) { + monitor.wait(); + } + } + + // Wake up the extra task and wait for it to complete + + { + Synchronized s(bmonitor); + + bmonitor.notifyAll(); + } + + { + Synchronized s(monitor); + + while(activeCounts[2] != 0) { + monitor.wait(); + } + } + + if(!(success = (threadManager->totalTaskCount() == 0))) { + throw TException("Unexpected pending task count"); + } + + } catch(TException& e) { + } + + std::cout << "\t\t\t" << (success ? "Success" : "Failure") << std::endl; + return success; + } +}; + +const double ThreadManagerTests::ERROR = .20; + +}}}} // apache::thrift::concurrency + +using namespace apache::thrift::concurrency::test; + |