summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-11-19 12:18:26 +0000
committerGordon Sim <gsim@apache.org>2007-11-19 12:18:26 +0000
commit73eee018d301031a212fe3c8a8127b84c2b580ac (patch)
tree8f6361112c086c11613601fa699c910b5b5e8e8d /cpp/src
parent2c3e3bf4c62267ac6a0fe1f5d6a6288a927ace0b (diff)
downloadqpid-python-73eee018d301031a212fe3c8a8127b84c2b580ac.tar.gz
Fixes causing lost 'events' in queue dispatch
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@596277 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp20
-rw-r--r--cpp/src/qpid/broker/Queue.h1
-rw-r--r--cpp/src/qpid/sys/Serializer.cpp2
3 files changed, 7 insertions, 16 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 757f0aa62d..41a5767457 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -46,8 +46,7 @@ Queue::Queue(const string& _name, bool _autodelete,
const ConnectionToken* const _owner,
Manageable* parent) :
- dispatching(false),
- name(_name),
+ name(_name),
autodelete(_autodelete),
store(_store),
owner(_owner),
@@ -76,8 +75,7 @@ void Queue::notifyDurableIOComplete()
{
// signal SemanticHander to ack completed dequeues
// then dispatch to ack...
- if (!dispatching)
- serializer.execute(dispatchCallback);
+ serializer.execute(dispatchCallback);
}
@@ -102,8 +100,7 @@ void Queue::deliver(intrusive_ptr<Message>& msg){
push(msg);
}
QPID_LOG(debug, "Message " << msg << " enqueued on " << name << "[" << this << "]");
- if (!dispatching)
- serializer.execute(dispatchCallback);
+ serializer.execute(dispatchCallback);
}
}
@@ -130,8 +127,7 @@ void Queue::process(intrusive_ptr<Message>& msg){
push(msg);
if (mgmtObject != 0)
mgmtObject->enqueue (msg->contentSize (), mask);
- if (!dispatching)
- serializer.execute(dispatchCallback);
+ serializer.execute(dispatchCallback);
}
@@ -141,8 +137,7 @@ void Queue::requeue(const QueuedMessage& msg){
msg.payload->enqueueComplete(); // mark the message as enqueued
messages.push_front(msg);
}
- if (!dispatching)
- serializer.execute(dispatchCallback);
+ serializer.execute(dispatchCallback);
}
bool Queue::acquire(const QueuedMessage& msg) {
@@ -158,8 +153,7 @@ bool Queue::acquire(const QueuedMessage& msg) {
void Queue::requestDispatch(Consumer::ptr c){
if (!c || c->preAcquires()) {
- if (!dispatching)
- serializer.execute(dispatchCallback);
+ serializer.execute(dispatchCallback);
} else {
DispatchFunctor f(*this, c);
serializer.execute(f);
@@ -235,7 +229,6 @@ bool Queue::getNextMessage(QueuedMessage& msg)
void Queue::dispatch()
{
- dispatching = true;
QueuedMessage msg(this);
while (getNextMessage(msg) && msg.payload->isEnqueueComplete()){
if (dispatch(msg)) {
@@ -249,7 +242,6 @@ void Queue::dispatch()
}
}
serviceAllBrowsers();
- dispatching = false;
}
void Queue::serviceAllBrowsers()
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h
index 9a7b893f36..1e56f1b6e9 100644
--- a/cpp/src/qpid/broker/Queue.h
+++ b/cpp/src/qpid/broker/Queue.h
@@ -75,7 +75,6 @@ namespace qpid {
void operator()();
};
- bool dispatching;
const string name;
const bool autodelete;
MessageStore* const store;
diff --git a/cpp/src/qpid/sys/Serializer.cpp b/cpp/src/qpid/sys/Serializer.cpp
index a82982a0c8..86f901aa78 100644
--- a/cpp/src/qpid/sys/Serializer.cpp
+++ b/cpp/src/qpid/sys/Serializer.cpp
@@ -58,7 +58,7 @@ bool SerializerBase::running() {
void SerializerBase::wait() {
Mutex::ScopedLock l(lock);
- lock.wait();
+ if (state == IDLE) lock.wait();
}
void SerializerBase::run() {