diff options
author | Carl C. Trieloff <cctrieloff@apache.org> | 2007-11-13 21:49:39 +0000 |
---|---|---|
committer | Carl C. Trieloff <cctrieloff@apache.org> | 2007-11-13 21:49:39 +0000 |
commit | 62c715d74189230c23c7e13f0bd71b89a18083ca (patch) | |
tree | 254e514edfc7be82da4b1096d398b0284db8e980 /cpp/src | |
parent | cce6eaa238a2bb4d64ccb4450d6f39320fe5434c (diff) | |
download | qpid-python-62c715d74189230c23c7e13f0bd71b89a18083ca.tar.gz |
- fixed sync mode deadlock
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@594655 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 20 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/sys/Serializer.h | 1 |
3 files changed, 17 insertions, 7 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 919343b152..e6d79056cd 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -46,7 +46,8 @@ Queue::Queue(const string& _name, bool _autodelete, const ConnectionToken* const _owner, Manageable* parent) : - name(_name), + dispatching(false), + name(_name), autodelete(_autodelete), store(_store), owner(_owner), @@ -75,7 +76,8 @@ void Queue::notifyDurableIOComplete() { // signal SemanticHander to ack completed dequeues // then dispatch to ack... - serializer.execute(dispatchCallback); + if (!dispatching) + serializer.execute(dispatchCallback); } @@ -100,7 +102,8 @@ void Queue::deliver(Message::shared_ptr& msg){ push(msg); } QPID_LOG(debug, "Message " << msg << " enqueued on " << name << "[" << this << "]"); - serializer.execute(dispatchCallback); + if (!dispatching) + serializer.execute(dispatchCallback); } } @@ -127,7 +130,8 @@ void Queue::process(Message::shared_ptr& msg){ push(msg); if (mgmtObject != 0) mgmtObject->enqueue (msg->contentSize (), mask); - serializer.execute(dispatchCallback); + if (!dispatching) + serializer.execute(dispatchCallback); } @@ -137,7 +141,8 @@ void Queue::requeue(const QueuedMessage& msg){ msg.payload->enqueueComplete(); // mark the message as enqueued messages.push_front(msg); } - serializer.execute(dispatchCallback); + if (!dispatching) + serializer.execute(dispatchCallback); } bool Queue::acquire(const QueuedMessage& msg) { @@ -153,7 +158,8 @@ bool Queue::acquire(const QueuedMessage& msg) { void Queue::requestDispatch(Consumer::ptr c){ if (!c || c->preAcquires()) { - serializer.execute(dispatchCallback); + if (!dispatching) + serializer.execute(dispatchCallback); } else { DispatchFunctor f(*this, c); serializer.execute(f); @@ -229,6 +235,7 @@ bool Queue::getNextMessage(QueuedMessage& msg) void Queue::dispatch() { + dispatching = true; QueuedMessage msg(this); while (getNextMessage(msg) && msg.payload->isEnqueueComplete()){ if (dispatch(msg)) { @@ -242,6 +249,7 @@ void Queue::dispatch() } } serviceAllBrowsers(); + dispatching = false; } void Queue::serviceAllBrowsers() diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index 4439ecbcc1..9eca31e4fc 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -74,7 +74,8 @@ namespace qpid { DispatchFunctor(Queue& q, Consumer::ptr c, DispatchCompletion* s = 0) : queue(q), consumer(c), sync(s) {} void operator()(); }; - + + bool dispatching; const string name; const bool autodelete; MessageStore* const store; diff --git a/cpp/src/qpid/sys/Serializer.h b/cpp/src/qpid/sys/Serializer.h index d0d34c26eb..30b7f88505 100644 --- a/cpp/src/qpid/sys/Serializer.h +++ b/cpp/src/qpid/sys/Serializer.h @@ -163,6 +163,7 @@ void Serializer<Task>::dispatch(Task& task) { assert(state == EXECUTING || state == DISPATCHING); Mutex::ScopedUnlock u(lock); // No exceptions allowed in task. + notifyWorker(); try { task(); } catch (...) { assert(0); } } |