summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Queue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp20
1 files changed, 14 insertions, 6 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 919343b152..e6d79056cd 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -46,7 +46,8 @@ Queue::Queue(const string& _name, bool _autodelete,
const ConnectionToken* const _owner,
Manageable* parent) :
- name(_name),
+ dispatching(false),
+ name(_name),
autodelete(_autodelete),
store(_store),
owner(_owner),
@@ -75,7 +76,8 @@ void Queue::notifyDurableIOComplete()
{
// signal SemanticHander to ack completed dequeues
// then dispatch to ack...
- serializer.execute(dispatchCallback);
+ if (!dispatching)
+ serializer.execute(dispatchCallback);
}
@@ -100,7 +102,8 @@ void Queue::deliver(Message::shared_ptr& msg){
push(msg);
}
QPID_LOG(debug, "Message " << msg << " enqueued on " << name << "[" << this << "]");
- serializer.execute(dispatchCallback);
+ if (!dispatching)
+ serializer.execute(dispatchCallback);
}
}
@@ -127,7 +130,8 @@ void Queue::process(Message::shared_ptr& msg){
push(msg);
if (mgmtObject != 0)
mgmtObject->enqueue (msg->contentSize (), mask);
- serializer.execute(dispatchCallback);
+ if (!dispatching)
+ serializer.execute(dispatchCallback);
}
@@ -137,7 +141,8 @@ void Queue::requeue(const QueuedMessage& msg){
msg.payload->enqueueComplete(); // mark the message as enqueued
messages.push_front(msg);
}
- serializer.execute(dispatchCallback);
+ if (!dispatching)
+ serializer.execute(dispatchCallback);
}
bool Queue::acquire(const QueuedMessage& msg) {
@@ -153,7 +158,8 @@ bool Queue::acquire(const QueuedMessage& msg) {
void Queue::requestDispatch(Consumer::ptr c){
if (!c || c->preAcquires()) {
- serializer.execute(dispatchCallback);
+ if (!dispatching)
+ serializer.execute(dispatchCallback);
} else {
DispatchFunctor f(*this, c);
serializer.execute(f);
@@ -229,6 +235,7 @@ bool Queue::getNextMessage(QueuedMessage& msg)
void Queue::dispatch()
{
+ dispatching = true;
QueuedMessage msg(this);
while (getNextMessage(msg) && msg.payload->isEnqueueComplete()){
if (dispatch(msg)) {
@@ -242,6 +249,7 @@ void Queue::dispatch()
}
}
serviceAllBrowsers();
+ dispatching = false;
}
void Queue::serviceAllBrowsers()