diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 20 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/sys/Serializer.cpp | 2 |
3 files changed, 7 insertions, 16 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 757f0aa62d..41a5767457 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -46,8 +46,7 @@ Queue::Queue(const string& _name, bool _autodelete, const ConnectionToken* const _owner, Manageable* parent) : - dispatching(false), - name(_name), + name(_name), autodelete(_autodelete), store(_store), owner(_owner), @@ -76,8 +75,7 @@ void Queue::notifyDurableIOComplete() { // signal SemanticHander to ack completed dequeues // then dispatch to ack... - if (!dispatching) - serializer.execute(dispatchCallback); + serializer.execute(dispatchCallback); } @@ -102,8 +100,7 @@ void Queue::deliver(intrusive_ptr<Message>& msg){ push(msg); } QPID_LOG(debug, "Message " << msg << " enqueued on " << name << "[" << this << "]"); - if (!dispatching) - serializer.execute(dispatchCallback); + serializer.execute(dispatchCallback); } } @@ -130,8 +127,7 @@ void Queue::process(intrusive_ptr<Message>& msg){ push(msg); if (mgmtObject != 0) mgmtObject->enqueue (msg->contentSize (), mask); - if (!dispatching) - serializer.execute(dispatchCallback); + serializer.execute(dispatchCallback); } @@ -141,8 +137,7 @@ void Queue::requeue(const QueuedMessage& msg){ msg.payload->enqueueComplete(); // mark the message as enqueued messages.push_front(msg); } - if (!dispatching) - serializer.execute(dispatchCallback); + serializer.execute(dispatchCallback); } bool Queue::acquire(const QueuedMessage& msg) { @@ -158,8 +153,7 @@ bool Queue::acquire(const QueuedMessage& msg) { void Queue::requestDispatch(Consumer::ptr c){ if (!c || c->preAcquires()) { - if (!dispatching) - serializer.execute(dispatchCallback); + serializer.execute(dispatchCallback); } else { DispatchFunctor f(*this, c); serializer.execute(f); @@ -235,7 +229,6 @@ bool Queue::getNextMessage(QueuedMessage& msg) void Queue::dispatch() { - dispatching = true; QueuedMessage msg(this); while (getNextMessage(msg) && msg.payload->isEnqueueComplete()){ if (dispatch(msg)) { @@ -249,7 +242,6 @@ void Queue::dispatch() } } serviceAllBrowsers(); - dispatching = false; } void Queue::serviceAllBrowsers() diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index 9a7b893f36..1e56f1b6e9 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -75,7 +75,6 @@ namespace qpid { void operator()(); }; - bool dispatching; const string name; const bool autodelete; MessageStore* const store; diff --git a/cpp/src/qpid/sys/Serializer.cpp b/cpp/src/qpid/sys/Serializer.cpp index a82982a0c8..86f901aa78 100644 --- a/cpp/src/qpid/sys/Serializer.cpp +++ b/cpp/src/qpid/sys/Serializer.cpp @@ -58,7 +58,7 @@ bool SerializerBase::running() { void SerializerBase::wait() { Mutex::ScopedLock l(lock); - lock.wait(); + if (state == IDLE) lock.wait(); } void SerializerBase::run() { |