summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/sys/Serializer.h
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/sys/Serializer.h')
-rw-r--r--qpid/cpp/src/qpid/sys/Serializer.h130
1 files changed, 105 insertions, 25 deletions
diff --git a/qpid/cpp/src/qpid/sys/Serializer.h b/qpid/cpp/src/qpid/sys/Serializer.h
index 337686cca0..085d51d7e2 100644
--- a/qpid/cpp/src/qpid/sys/Serializer.h
+++ b/qpid/cpp/src/qpid/sys/Serializer.h
@@ -36,29 +36,69 @@
namespace qpid {
namespace sys {
+/** Abstract base class for Serializer below. */
+class SerializerBase : private boost::noncopyable, private Runnable
+{
+ public:
+ typedef boost::function<void()> VoidFn0;
+ /** @see Serializer::Serializer */
+ SerializerBase(bool immediate=true, VoidFn0 notifyDispatch=VoidFn0());
+
+ virtual ~SerializerBase();
+
+ virtual void dispatch() = 0;
+ protected:
+ enum State {
+ IDLE, ///< No threads are active.
+ EXECUTING, ///< execute() is executing a single task.
+ DISPATCHING, ///< dispatch() is draining the queue.
+ SHUTDOWN ///< SerailizerBase is being destroyed.
+ };
+
+ void notifyWorker();
+ void run();
+ virtual bool empty() = 0;
+
+ Monitor lock;
+ State state;
+ bool immediate;
+ Thread worker;
+ boost::function<void()> notifyDispatch;
+};
+
+
/**
* Execute tasks sequentially, queuing tasks when necessary to
* ensure only one thread at a time executes a task and tasks
* are executed in order.
+ *
+ * Task is a void returning 0-arg functor. It must not throw exceptions.
+ *
+ * Note we deliberately do not use boost::function as the task type
+ * because copying a boost::functor allocates the target object on the
+ * heap.
*/
-class Serializer : private boost::noncopyable, private Runnable
-{
- public:
- typedef boost::function<void()> Task;
+template <class Task>
+class Serializer : public SerializerBase {
+
+ std::deque<Task> queue;
+ bool empty() { return queue.empty(); }
+ void dispatch(Task& task);
+
+ public:
/** Start a serializer.
*
* @param notifyDispatch Called when work is pending and there is no
* active dispatch thread. Must arrange for dispatch() to be called
* in some thread other than the calling thread and return.
- * By default the Serializer supplies its own dispatch thread.
+ * By default the Serailizer supplies its own dispatch thread.
*
* @param immediate Allow execute() to execute a task immediatly
* in the current thread.
*/
- Serializer(bool immediate=true, Task notifyDispatch=Task());
-
- ~Serializer();
+ Serializer(bool immediate=true, VoidFn0 notifyDispatch=VoidFn0())
+ : SerializerBase(immediate, notifyDispatch) {}
/**
* Task may be executed immediately in the calling thread if there
@@ -68,33 +108,73 @@ class Serializer : private boost::noncopyable, private Runnable
*/
void execute(Task& task);
+
/** Execute pending tasks sequentially in calling thread.
* Drains the task queue and returns, does not block for more tasks.
*
* @exception ShutdownException if the serializer is being destroyed.
*/
void dispatch();
-
- private:
- enum State {
- IDLE, ///< No threads are active.
- EXECUTING, ///< execute() is executing a single task.
- DISPATCHING, ///< dispatch() is draining the queue.
- SHUTDOWN ///< Serializer is being destroyed.
};
- void dispatch(Task&);
- void notifyWorker();
- void run();
- Monitor lock;
+template <class Task>
+void Serializer<Task>::execute(Task& task) {
+ bool needNotify = false;
+ {
+ Mutex::ScopedLock l(lock);
+ assert(state != SHUTDOWN);
+ if (immediate && state == IDLE) {
+ state = EXECUTING;
+ dispatch(task);
+ if (state != SHUTDOWN) {
+ assert(state == EXECUTING);
+ state = IDLE;
+ }
+ }
+ else
+ queue.push_back(task);
+ if (!queue.empty() && state == IDLE) {
+ state = DISPATCHING;
+ needNotify = true;
+ }
+ }
+ if (needNotify)
+ notifyDispatch(); // Not my function, call outside lock.
+}
+
+template <class Task>
+void Serializer<Task>::dispatch() {
+ Mutex::ScopedLock l(lock);
+ // TODO aconway 2007-07-16: This loop could be unbounded
+ // if other threads add work while we're in dispatch(Task&).
+ // If we need to bound it we could dispatch just the elements
+ // that were enqueued when dispatch() was first called - save
+ // begin() iterator and pop only up to that.
+ while (!queue.empty() && state != SHUTDOWN) {
+ assert(state == DISPATCHING);
+ dispatch(queue.front());
+ queue.pop_front();
+ }
+ if (state != SHUTDOWN) {
+ assert(state == DISPATCHING);
+ state = IDLE;
+ }
+}
+
+template <class Task>
+void Serializer<Task>::dispatch(Task& task) {
+ Mutex::ScopedUnlock u(lock);
+ // Preconditions: lock is held, state is EXECUTING or DISPATCHING
+ assert(state != IDLE);
+ assert(state != SHUTDOWN);
+ assert(state == EXECUTING || state == DISPATCHING);
+ // No exceptions allowed in task.
+ try { task(); } catch (...) { assert(0); }
+}
+
+
- State state;
- bool immediate;
- std::deque<Task> queue;
- Thread worker;
- Task notifyDispatch;
-};
}} // namespace qpid::sys