diff options
Diffstat (limited to 'cpp/lib/common/sys/posix/EventChannelThreads.h')
-rw-r--r-- | cpp/lib/common/sys/posix/EventChannelThreads.h | 43 |
1 files changed, 29 insertions, 14 deletions
diff --git a/cpp/lib/common/sys/posix/EventChannelThreads.h b/cpp/lib/common/sys/posix/EventChannelThreads.h index 98403c0869..721a5e9d24 100644 --- a/cpp/lib/common/sys/posix/EventChannelThreads.h +++ b/cpp/lib/common/sys/posix/EventChannelThreads.h @@ -20,11 +20,12 @@ */ #include <vector> -#include <Exception.h> -#include <sys/Time.h> -#include <sys/Monitor.h> -#include <sys/Thread.h> -#include <sys/AtomicCount.h> +#include "Exception.h" +#include "sys/AtomicCount.h" +#include "sys/Monitor.h" +#include "sys/Thread.h" +#include "sys/Time.h" + #include "EventChannel.h" namespace qpid { @@ -33,26 +34,36 @@ namespace sys { /** Dynamic thread pool serving an EventChannel. - Threads run a loop { e = getEvent(); e->dispatch(); } + Threads run a loop { e = wait(); e->dispatch(); } The size of the thread pool is automatically adjusted to optimal size. */ class EventChannelThreads : public qpid::SharedObject<EventChannelThreads>, - public sys::Monitor, private sys::Runnable + private sys::Runnable { public: - /** Create the thread pool and start initial threads. */ + /** Constant to represent an unlimited number of threads */ + static const size_t unlimited; + + /** + * Create the thread pool and start initial threads. + * @param minThreads Pool will initialy contain minThreads threads and + * will never shrink to less until shutdown. + * @param maxThreads Pool will never grow to more than maxThreads. + */ static EventChannelThreads::shared_ptr create( - EventChannel::shared_ptr channel + EventChannel::shared_ptr channel = EventChannel::create(), + size_t minThreads = 1, + size_t maxThreads = unlimited ); ~EventChannelThreads(); /** Post event to the underlying channel */ - void postEvent(Event& event) { channel->postEvent(event); } + void post(Event& event) { channel->post(event); } /** Post event to the underlying channel Must not be 0. */ - void postEvent(Event* event) { channel->postEvent(event); } + void post(Event* event) { channel->post(event); } /** * Terminate all threads. @@ -68,21 +79,25 @@ class EventChannelThreads : private: typedef std::vector<sys::Thread> Threads; typedef enum { - RUNNING, TERMINATE_SENT, JOINING, SHUTDOWN + RUNNING, TERMINATING, JOINING, SHUTDOWN } State; - EventChannelThreads(EventChannel::shared_ptr underlyingChannel); + EventChannelThreads( + EventChannel::shared_ptr channel, size_t min, size_t max); + void addThread(); void run(); bool keepRunning(); void adjustThreads(); + Monitor monitor; + size_t minThreads; + size_t maxThreads; EventChannel::shared_ptr channel; Threads workers; sys::AtomicCount nWaiting; State state; - Event terminate; }; |