diff options
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 20 |
1 files changed, 14 insertions, 6 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 919343b152..e6d79056cd 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -46,7 +46,8 @@ Queue::Queue(const string& _name, bool _autodelete, const ConnectionToken* const _owner, Manageable* parent) : - name(_name), + dispatching(false), + name(_name), autodelete(_autodelete), store(_store), owner(_owner), @@ -75,7 +76,8 @@ void Queue::notifyDurableIOComplete() { // signal SemanticHander to ack completed dequeues // then dispatch to ack... - serializer.execute(dispatchCallback); + if (!dispatching) + serializer.execute(dispatchCallback); } @@ -100,7 +102,8 @@ void Queue::deliver(Message::shared_ptr& msg){ push(msg); } QPID_LOG(debug, "Message " << msg << " enqueued on " << name << "[" << this << "]"); - serializer.execute(dispatchCallback); + if (!dispatching) + serializer.execute(dispatchCallback); } } @@ -127,7 +130,8 @@ void Queue::process(Message::shared_ptr& msg){ push(msg); if (mgmtObject != 0) mgmtObject->enqueue (msg->contentSize (), mask); - serializer.execute(dispatchCallback); + if (!dispatching) + serializer.execute(dispatchCallback); } @@ -137,7 +141,8 @@ void Queue::requeue(const QueuedMessage& msg){ msg.payload->enqueueComplete(); // mark the message as enqueued messages.push_front(msg); } - serializer.execute(dispatchCallback); + if (!dispatching) + serializer.execute(dispatchCallback); } bool Queue::acquire(const QueuedMessage& msg) { @@ -153,7 +158,8 @@ bool Queue::acquire(const QueuedMessage& msg) { void Queue::requestDispatch(Consumer::ptr c){ if (!c || c->preAcquires()) { - serializer.execute(dispatchCallback); + if (!dispatching) + serializer.execute(dispatchCallback); } else { DispatchFunctor f(*this, c); serializer.execute(f); @@ -229,6 +235,7 @@ bool Queue::getNextMessage(QueuedMessage& msg) void Queue::dispatch() { + dispatching = true; QueuedMessage msg(this); while (getNextMessage(msg) && msg.payload->isEnqueueComplete()){ if (dispatch(msg)) { @@ -242,6 +249,7 @@ void Queue::dispatch() } } serviceAllBrowsers(); + dispatching = false; } void Queue::serviceAllBrowsers() |