summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/posix/EventChannelThreads.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/sys/posix/EventChannelThreads.cpp')
-rw-r--r--cpp/src/qpid/sys/posix/EventChannelThreads.cpp75
1 files changed, 42 insertions, 33 deletions
diff --git a/cpp/src/qpid/sys/posix/EventChannelThreads.cpp b/cpp/src/qpid/sys/posix/EventChannelThreads.cpp
index 68c57405d5..70954d0c16 100644
--- a/cpp/src/qpid/sys/posix/EventChannelThreads.cpp
+++ b/cpp/src/qpid/sys/posix/EventChannelThreads.cpp
@@ -16,27 +16,40 @@
*
*/
-#include "EventChannelThreads.h"
-#include "qpid/sys/Runnable.h"
-#include "qpid/log/Statement.h"
#include <iostream>
-using namespace std;
+#include <limits>
+
#include <boost/bind.hpp>
+#include "qpid/sys/Runnable.h"
+
+#include "EventChannelThreads.h"
+
namespace qpid {
namespace sys {
+const size_t EventChannelThreads::unlimited =
+ std::numeric_limits<size_t>::max();
+
EventChannelThreads::shared_ptr EventChannelThreads::create(
- EventChannel::shared_ptr ec)
+ EventChannel::shared_ptr ec, size_t min, size_t max
+)
{
- return EventChannelThreads::shared_ptr(new EventChannelThreads(ec));
+ return EventChannelThreads::shared_ptr(
+ new EventChannelThreads(ec, min, max));
}
-EventChannelThreads::EventChannelThreads(EventChannel::shared_ptr ec) :
- channel(ec), nWaiting(0), state(RUNNING)
+EventChannelThreads::EventChannelThreads(
+ EventChannel::shared_ptr ec, size_t min, size_t max) :
+ minThreads(std::max(size_t(1), min)),
+ maxThreads(std::min(min, max)),
+ channel(ec),
+ nWaiting(0),
+ state(RUNNING)
{
- // TODO aconway 2006-11-15: Estimate initial threads based on CPUs.
- addThread();
+ Monitor::ScopedLock l(monitor);
+ while (workers.size() < minThreads)
+ workers.push_back(Thread(*this));
}
EventChannelThreads::~EventChannelThreads() {
@@ -46,32 +59,30 @@ EventChannelThreads::~EventChannelThreads() {
void EventChannelThreads::shutdown()
{
- ScopedLock lock(*this);
+ Monitor::ScopedLock lock(monitor);
if (state != RUNNING) // Already shutting down.
return;
- for (size_t i = 0; i < workers.size(); ++i) {
- channel->postEvent(terminate);
- }
- state = TERMINATE_SENT;
- notify(); // Wake up one join() thread.
+ state = TERMINATING;
+ channel->shutdown();
+ monitor.notify(); // Wake up one join() thread.
}
void EventChannelThreads::join()
{
{
- ScopedLock lock(*this);
+ Monitor::ScopedLock lock(monitor);
while (state == RUNNING) // Wait for shutdown to start.
- wait();
+ monitor.wait();
if (state == SHUTDOWN) // Shutdown is complete
return;
if (state == JOINING) {
// Someone else is doing the join.
while (state != SHUTDOWN)
- wait();
+ monitor.wait();
return;
}
// I'm the joining thread
- assert(state == TERMINATE_SENT);
+ assert(state == TERMINATING);
state = JOINING;
} // Drop the lock.
@@ -80,12 +91,13 @@ void EventChannelThreads::join()
workers[i].join();
}
state = SHUTDOWN;
- notifyAll(); // Notify other join() threaeds.
+ monitor.notifyAll(); // Notify any other join() threads.
}
void EventChannelThreads::addThread() {
- ScopedLock l(*this);
- workers.push_back(Thread(*this));
+ Monitor::ScopedLock l(monitor);
+ if (workers.size() < maxThreads)
+ workers.push_back(Thread(*this));
}
void EventChannelThreads::run()
@@ -94,23 +106,20 @@ void EventChannelThreads::run()
AtomicCount::ScopedIncrement inc(nWaiting);
try {
while (true) {
- Event* e = channel->getEvent();
+ Event* e = channel->wait();
assert(e != 0);
- if (e == &terminate) {
- return;
- }
AtomicCount::ScopedDecrement dec(nWaiting);
- // I'm no longer waiting, make sure someone is.
- if (dec == 0)
+ // Make sure there's at least one waiting thread.
+ if (dec == 0 && state == RUNNING)
addThread();
e->dispatch();
}
}
- catch (const std::exception& e) {
- QPID_LOG(error, e.what());
+ catch (const EventChannel::ShutdownException& e) {
+ return;
}
- catch (...) {
- QPID_LOG(error, "unknown exception");
+ catch (const std::exception& e) {
+ Exception::log(e, "Exception in EventChannelThreads::run()");
}
}