summaryrefslogtreecommitdiff
path: root/cpp/lib/common/sys/posix/EventChannelThreads.h
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/lib/common/sys/posix/EventChannelThreads.h')
-rw-r--r--cpp/lib/common/sys/posix/EventChannelThreads.h43
1 files changed, 29 insertions, 14 deletions
diff --git a/cpp/lib/common/sys/posix/EventChannelThreads.h b/cpp/lib/common/sys/posix/EventChannelThreads.h
index 98403c0869..721a5e9d24 100644
--- a/cpp/lib/common/sys/posix/EventChannelThreads.h
+++ b/cpp/lib/common/sys/posix/EventChannelThreads.h
@@ -20,11 +20,12 @@
*/
#include <vector>
-#include <Exception.h>
-#include <sys/Time.h>
-#include <sys/Monitor.h>
-#include <sys/Thread.h>
-#include <sys/AtomicCount.h>
+#include "Exception.h"
+#include "sys/AtomicCount.h"
+#include "sys/Monitor.h"
+#include "sys/Thread.h"
+#include "sys/Time.h"
+
#include "EventChannel.h"
namespace qpid {
@@ -33,26 +34,36 @@ namespace sys {
/**
Dynamic thread pool serving an EventChannel.
- Threads run a loop { e = getEvent(); e->dispatch(); }
+ Threads run a loop { e = wait(); e->dispatch(); }
The size of the thread pool is automatically adjusted to optimal size.
*/
class EventChannelThreads :
public qpid::SharedObject<EventChannelThreads>,
- public sys::Monitor, private sys::Runnable
+ private sys::Runnable
{
public:
- /** Create the thread pool and start initial threads. */
+ /** Constant to represent an unlimited number of threads */
+ static const size_t unlimited;
+
+ /**
+ * Create the thread pool and start initial threads.
+ * @param minThreads Pool will initialy contain minThreads threads and
+ * will never shrink to less until shutdown.
+ * @param maxThreads Pool will never grow to more than maxThreads.
+ */
static EventChannelThreads::shared_ptr create(
- EventChannel::shared_ptr channel
+ EventChannel::shared_ptr channel = EventChannel::create(),
+ size_t minThreads = 1,
+ size_t maxThreads = unlimited
);
~EventChannelThreads();
/** Post event to the underlying channel */
- void postEvent(Event& event) { channel->postEvent(event); }
+ void post(Event& event) { channel->post(event); }
/** Post event to the underlying channel Must not be 0. */
- void postEvent(Event* event) { channel->postEvent(event); }
+ void post(Event* event) { channel->post(event); }
/**
* Terminate all threads.
@@ -68,21 +79,25 @@ class EventChannelThreads :
private:
typedef std::vector<sys::Thread> Threads;
typedef enum {
- RUNNING, TERMINATE_SENT, JOINING, SHUTDOWN
+ RUNNING, TERMINATING, JOINING, SHUTDOWN
} State;
- EventChannelThreads(EventChannel::shared_ptr underlyingChannel);
+ EventChannelThreads(
+ EventChannel::shared_ptr channel, size_t min, size_t max);
+
void addThread();
void run();
bool keepRunning();
void adjustThreads();
+ Monitor monitor;
+ size_t minThreads;
+ size_t maxThreads;
EventChannel::shared_ptr channel;
Threads workers;
sys::AtomicCount nWaiting;
State state;
- Event terminate;
};