diff options
Diffstat (limited to 'cpp/src/qpid/sys/posix/EventChannelThreads.cpp')
-rw-r--r-- | cpp/src/qpid/sys/posix/EventChannelThreads.cpp | 75 |
1 files changed, 42 insertions, 33 deletions
diff --git a/cpp/src/qpid/sys/posix/EventChannelThreads.cpp b/cpp/src/qpid/sys/posix/EventChannelThreads.cpp index 68c57405d5..70954d0c16 100644 --- a/cpp/src/qpid/sys/posix/EventChannelThreads.cpp +++ b/cpp/src/qpid/sys/posix/EventChannelThreads.cpp @@ -16,27 +16,40 @@ * */ -#include "EventChannelThreads.h" -#include "qpid/sys/Runnable.h" -#include "qpid/log/Statement.h" #include <iostream> -using namespace std; +#include <limits> + #include <boost/bind.hpp> +#include "qpid/sys/Runnable.h" + +#include "EventChannelThreads.h" + namespace qpid { namespace sys { +const size_t EventChannelThreads::unlimited = + std::numeric_limits<size_t>::max(); + EventChannelThreads::shared_ptr EventChannelThreads::create( - EventChannel::shared_ptr ec) + EventChannel::shared_ptr ec, size_t min, size_t max +) { - return EventChannelThreads::shared_ptr(new EventChannelThreads(ec)); + return EventChannelThreads::shared_ptr( + new EventChannelThreads(ec, min, max)); } -EventChannelThreads::EventChannelThreads(EventChannel::shared_ptr ec) : - channel(ec), nWaiting(0), state(RUNNING) +EventChannelThreads::EventChannelThreads( + EventChannel::shared_ptr ec, size_t min, size_t max) : + minThreads(std::max(size_t(1), min)), + maxThreads(std::min(min, max)), + channel(ec), + nWaiting(0), + state(RUNNING) { - // TODO aconway 2006-11-15: Estimate initial threads based on CPUs. - addThread(); + Monitor::ScopedLock l(monitor); + while (workers.size() < minThreads) + workers.push_back(Thread(*this)); } EventChannelThreads::~EventChannelThreads() { @@ -46,32 +59,30 @@ EventChannelThreads::~EventChannelThreads() { void EventChannelThreads::shutdown() { - ScopedLock lock(*this); + Monitor::ScopedLock lock(monitor); if (state != RUNNING) // Already shutting down. return; - for (size_t i = 0; i < workers.size(); ++i) { - channel->postEvent(terminate); - } - state = TERMINATE_SENT; - notify(); // Wake up one join() thread. + state = TERMINATING; + channel->shutdown(); + monitor.notify(); // Wake up one join() thread. } void EventChannelThreads::join() { { - ScopedLock lock(*this); + Monitor::ScopedLock lock(monitor); while (state == RUNNING) // Wait for shutdown to start. - wait(); + monitor.wait(); if (state == SHUTDOWN) // Shutdown is complete return; if (state == JOINING) { // Someone else is doing the join. while (state != SHUTDOWN) - wait(); + monitor.wait(); return; } // I'm the joining thread - assert(state == TERMINATE_SENT); + assert(state == TERMINATING); state = JOINING; } // Drop the lock. @@ -80,12 +91,13 @@ void EventChannelThreads::join() workers[i].join(); } state = SHUTDOWN; - notifyAll(); // Notify other join() threaeds. + monitor.notifyAll(); // Notify any other join() threads. } void EventChannelThreads::addThread() { - ScopedLock l(*this); - workers.push_back(Thread(*this)); + Monitor::ScopedLock l(monitor); + if (workers.size() < maxThreads) + workers.push_back(Thread(*this)); } void EventChannelThreads::run() @@ -94,23 +106,20 @@ void EventChannelThreads::run() AtomicCount::ScopedIncrement inc(nWaiting); try { while (true) { - Event* e = channel->getEvent(); + Event* e = channel->wait(); assert(e != 0); - if (e == &terminate) { - return; - } AtomicCount::ScopedDecrement dec(nWaiting); - // I'm no longer waiting, make sure someone is. - if (dec == 0) + // Make sure there's at least one waiting thread. + if (dec == 0 && state == RUNNING) addThread(); e->dispatch(); } } - catch (const std::exception& e) { - QPID_LOG(error, e.what()); + catch (const EventChannel::ShutdownException& e) { + return; } - catch (...) { - QPID_LOG(error, "unknown exception"); + catch (const std::exception& e) { + Exception::log(e, "Exception in EventChannelThreads::run()"); } } |