summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCarl C. Trieloff <cctrieloff@apache.org>2007-11-13 21:49:39 +0000
committerCarl C. Trieloff <cctrieloff@apache.org>2007-11-13 21:49:39 +0000
commit62c715d74189230c23c7e13f0bd71b89a18083ca (patch)
tree254e514edfc7be82da4b1096d398b0284db8e980
parentcce6eaa238a2bb4d64ccb4450d6f39320fe5434c (diff)
downloadqpid-python-62c715d74189230c23c7e13f0bd71b89a18083ca.tar.gz
- fixed sync mode deadlock
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@594655 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/qpid/broker/Queue.cpp20
-rw-r--r--cpp/src/qpid/broker/Queue.h3
-rw-r--r--cpp/src/qpid/sys/Serializer.h1
3 files changed, 17 insertions, 7 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 919343b152..e6d79056cd 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -46,7 +46,8 @@ Queue::Queue(const string& _name, bool _autodelete,
const ConnectionToken* const _owner,
Manageable* parent) :
- name(_name),
+ dispatching(false),
+ name(_name),
autodelete(_autodelete),
store(_store),
owner(_owner),
@@ -75,7 +76,8 @@ void Queue::notifyDurableIOComplete()
{
// signal SemanticHander to ack completed dequeues
// then dispatch to ack...
- serializer.execute(dispatchCallback);
+ if (!dispatching)
+ serializer.execute(dispatchCallback);
}
@@ -100,7 +102,8 @@ void Queue::deliver(Message::shared_ptr& msg){
push(msg);
}
QPID_LOG(debug, "Message " << msg << " enqueued on " << name << "[" << this << "]");
- serializer.execute(dispatchCallback);
+ if (!dispatching)
+ serializer.execute(dispatchCallback);
}
}
@@ -127,7 +130,8 @@ void Queue::process(Message::shared_ptr& msg){
push(msg);
if (mgmtObject != 0)
mgmtObject->enqueue (msg->contentSize (), mask);
- serializer.execute(dispatchCallback);
+ if (!dispatching)
+ serializer.execute(dispatchCallback);
}
@@ -137,7 +141,8 @@ void Queue::requeue(const QueuedMessage& msg){
msg.payload->enqueueComplete(); // mark the message as enqueued
messages.push_front(msg);
}
- serializer.execute(dispatchCallback);
+ if (!dispatching)
+ serializer.execute(dispatchCallback);
}
bool Queue::acquire(const QueuedMessage& msg) {
@@ -153,7 +158,8 @@ bool Queue::acquire(const QueuedMessage& msg) {
void Queue::requestDispatch(Consumer::ptr c){
if (!c || c->preAcquires()) {
- serializer.execute(dispatchCallback);
+ if (!dispatching)
+ serializer.execute(dispatchCallback);
} else {
DispatchFunctor f(*this, c);
serializer.execute(f);
@@ -229,6 +235,7 @@ bool Queue::getNextMessage(QueuedMessage& msg)
void Queue::dispatch()
{
+ dispatching = true;
QueuedMessage msg(this);
while (getNextMessage(msg) && msg.payload->isEnqueueComplete()){
if (dispatch(msg)) {
@@ -242,6 +249,7 @@ void Queue::dispatch()
}
}
serviceAllBrowsers();
+ dispatching = false;
}
void Queue::serviceAllBrowsers()
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h
index 4439ecbcc1..9eca31e4fc 100644
--- a/cpp/src/qpid/broker/Queue.h
+++ b/cpp/src/qpid/broker/Queue.h
@@ -74,7 +74,8 @@ namespace qpid {
DispatchFunctor(Queue& q, Consumer::ptr c, DispatchCompletion* s = 0) : queue(q), consumer(c), sync(s) {}
void operator()();
};
-
+
+ bool dispatching;
const string name;
const bool autodelete;
MessageStore* const store;
diff --git a/cpp/src/qpid/sys/Serializer.h b/cpp/src/qpid/sys/Serializer.h
index d0d34c26eb..30b7f88505 100644
--- a/cpp/src/qpid/sys/Serializer.h
+++ b/cpp/src/qpid/sys/Serializer.h
@@ -163,6 +163,7 @@ void Serializer<Task>::dispatch(Task& task) {
assert(state == EXECUTING || state == DISPATCHING);
Mutex::ScopedUnlock u(lock);
// No exceptions allowed in task.
+ notifyWorker();
try { task(); } catch (...) { assert(0); }
}