diff options
author | Alan Conway <aconway@apache.org> | 2007-08-21 23:35:23 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2007-08-21 23:35:23 +0000 |
commit | 9ef0c3dc8bc5ef4af668a3c19f8e254fb5e01ada (patch) | |
tree | b8afa05ed63e9d2d05392df406053c0d69d2fc36 /cpp/src/qpid/sys/Serializer.h | |
parent | 9ebcf9839197cafe78beb8dfa14b803bd78f5a5e (diff) | |
download | qpid-python-9ef0c3dc8bc5ef4af668a3c19f8e254fb5e01ada.tar.gz |
* src/qpid/sys/Serializer.h, .cpp:
Template Serializer on functor for execute().
Old Serializer equivalent to Serializer<boost::function<void()> >
* src/qpid/broker/BrokerQueue.h, .cpp:
Use hand-written functor for Serializer instead of boost::function.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@568332 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys/Serializer.h')
-rw-r--r-- | cpp/src/qpid/sys/Serializer.h | 130 |
1 files changed, 105 insertions, 25 deletions
diff --git a/cpp/src/qpid/sys/Serializer.h b/cpp/src/qpid/sys/Serializer.h index 337686cca0..085d51d7e2 100644 --- a/cpp/src/qpid/sys/Serializer.h +++ b/cpp/src/qpid/sys/Serializer.h @@ -36,29 +36,69 @@ namespace qpid { namespace sys { +/** Abstract base class for Serializer below. */ +class SerializerBase : private boost::noncopyable, private Runnable +{ + public: + typedef boost::function<void()> VoidFn0; + /** @see Serializer::Serializer */ + SerializerBase(bool immediate=true, VoidFn0 notifyDispatch=VoidFn0()); + + virtual ~SerializerBase(); + + virtual void dispatch() = 0; + protected: + enum State { + IDLE, ///< No threads are active. + EXECUTING, ///< execute() is executing a single task. + DISPATCHING, ///< dispatch() is draining the queue. + SHUTDOWN ///< SerailizerBase is being destroyed. + }; + + void notifyWorker(); + void run(); + virtual bool empty() = 0; + + Monitor lock; + State state; + bool immediate; + Thread worker; + boost::function<void()> notifyDispatch; +}; + + /** * Execute tasks sequentially, queuing tasks when necessary to * ensure only one thread at a time executes a task and tasks * are executed in order. + * + * Task is a void returning 0-arg functor. It must not throw exceptions. + * + * Note we deliberately do not use boost::function as the task type + * because copying a boost::functor allocates the target object on the + * heap. */ -class Serializer : private boost::noncopyable, private Runnable -{ - public: - typedef boost::function<void()> Task; +template <class Task> +class Serializer : public SerializerBase { + + std::deque<Task> queue; + bool empty() { return queue.empty(); } + void dispatch(Task& task); + + public: /** Start a serializer. * * @param notifyDispatch Called when work is pending and there is no * active dispatch thread. Must arrange for dispatch() to be called * in some thread other than the calling thread and return. - * By default the Serializer supplies its own dispatch thread. + * By default the Serailizer supplies its own dispatch thread. * * @param immediate Allow execute() to execute a task immediatly * in the current thread. */ - Serializer(bool immediate=true, Task notifyDispatch=Task()); - - ~Serializer(); + Serializer(bool immediate=true, VoidFn0 notifyDispatch=VoidFn0()) + : SerializerBase(immediate, notifyDispatch) {} /** * Task may be executed immediately in the calling thread if there @@ -68,33 +108,73 @@ class Serializer : private boost::noncopyable, private Runnable */ void execute(Task& task); + /** Execute pending tasks sequentially in calling thread. * Drains the task queue and returns, does not block for more tasks. * * @exception ShutdownException if the serializer is being destroyed. */ void dispatch(); - - private: - enum State { - IDLE, ///< No threads are active. - EXECUTING, ///< execute() is executing a single task. - DISPATCHING, ///< dispatch() is draining the queue. - SHUTDOWN ///< Serializer is being destroyed. }; - void dispatch(Task&); - void notifyWorker(); - void run(); - Monitor lock; +template <class Task> +void Serializer<Task>::execute(Task& task) { + bool needNotify = false; + { + Mutex::ScopedLock l(lock); + assert(state != SHUTDOWN); + if (immediate && state == IDLE) { + state = EXECUTING; + dispatch(task); + if (state != SHUTDOWN) { + assert(state == EXECUTING); + state = IDLE; + } + } + else + queue.push_back(task); + if (!queue.empty() && state == IDLE) { + state = DISPATCHING; + needNotify = true; + } + } + if (needNotify) + notifyDispatch(); // Not my function, call outside lock. +} + +template <class Task> +void Serializer<Task>::dispatch() { + Mutex::ScopedLock l(lock); + // TODO aconway 2007-07-16: This loop could be unbounded + // if other threads add work while we're in dispatch(Task&). + // If we need to bound it we could dispatch just the elements + // that were enqueued when dispatch() was first called - save + // begin() iterator and pop only up to that. + while (!queue.empty() && state != SHUTDOWN) { + assert(state == DISPATCHING); + dispatch(queue.front()); + queue.pop_front(); + } + if (state != SHUTDOWN) { + assert(state == DISPATCHING); + state = IDLE; + } +} + +template <class Task> +void Serializer<Task>::dispatch(Task& task) { + Mutex::ScopedUnlock u(lock); + // Preconditions: lock is held, state is EXECUTING or DISPATCHING + assert(state != IDLE); + assert(state != SHUTDOWN); + assert(state == EXECUTING || state == DISPATCHING); + // No exceptions allowed in task. + try { task(); } catch (...) { assert(0); } +} + + - State state; - bool immediate; - std::deque<Task> queue; - Thread worker; - Task notifyDispatch; -}; }} // namespace qpid::sys |