diff options
Diffstat (limited to 'cpp/src/qpid/sys/Serializer.cpp')
-rw-r--r-- | cpp/src/qpid/sys/Serializer.cpp | 71 |
1 files changed, 5 insertions, 66 deletions
diff --git a/cpp/src/qpid/sys/Serializer.cpp b/cpp/src/qpid/sys/Serializer.cpp index faf94a0f93..76dfaa6f6a 100644 --- a/cpp/src/qpid/sys/Serializer.cpp +++ b/cpp/src/qpid/sys/Serializer.cpp @@ -29,14 +29,14 @@ namespace qpid { namespace sys { -Serializer::Serializer(bool allowImmediate, Task notifyDispatchFn) +SerializerBase::SerializerBase(bool allowImmediate, VoidFn0 notifyDispatchFn) : state(IDLE), immediate(allowImmediate), notifyDispatch(notifyDispatchFn) { if (notifyDispatch.empty()) - notifyDispatch = boost::bind(&Serializer::notifyWorker, this); + notifyDispatch = boost::bind(&SerializerBase::notifyWorker, this); } -Serializer::~Serializer() { +SerializerBase::~SerializerBase() { { Mutex::ScopedLock l(lock); state = SHUTDOWN; @@ -46,75 +46,14 @@ Serializer::~Serializer() { worker.join(); } -void Serializer::dispatch(Task& task) { - Mutex::ScopedUnlock u(lock); - // Preconditions: lock is held, state is EXECUTING or DISPATCHING - assert(state != IDLE); - assert(state != SHUTDOWN); - assert(state == EXECUTING || state == DISPATCHING); - try { - task(); - } catch (const std::exception& e) { - QPID_LOG(critical, "Unexpected exception in Serializer::dispatch" - << e.what()); - assert(0); // Should not happen. - } catch (...) { - QPID_LOG(critical, "Unexpected exception in Serializer::dispatch."); - assert(0); // Should not happen. - } -} - -void Serializer::execute(Task& task) { - bool needNotify = false; - { - Mutex::ScopedLock l(lock); - assert(state != SHUTDOWN); - if (immediate && state == IDLE) { - state = EXECUTING; - dispatch(task); - if (state != SHUTDOWN) { - assert(state == EXECUTING); - state = IDLE; - } - } - else - queue.push_back(task); - - if (!queue.empty() && state == IDLE) { - state = DISPATCHING; - needNotify = true; - } - } - if (needNotify) - notifyDispatch(); // Not my function, call outside lock. -} - -void Serializer::dispatch() { - Mutex::ScopedLock l(lock); - // TODO aconway 2007-07-16: This loop could be unbounded - // if other threads add work while we're in dispatch(Task&). - // If we need to bound it we could dispatch just the elements - // that were enqueued when dispatch() was first called - save - // begin() iterator and pop only up to that. - while (!queue.empty() && state != SHUTDOWN) { - assert(state == DISPATCHING); - dispatch(queue.front()); - queue.pop_front(); - } - if (state != SHUTDOWN) { - assert(state == DISPATCHING); - state = IDLE; - } -} - -void Serializer::notifyWorker() { +void SerializerBase::notifyWorker() { if (!worker.id()) worker = Thread(*this); else lock.notify(); } -void Serializer::run() { +void SerializerBase::run() { Mutex::ScopedLock l(lock); while (state != SHUTDOWN) { dispatch(); |