summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2009-05-25 15:51:52 +0000
committerAlan Conway <aconway@apache.org>2009-05-25 15:51:52 +0000
commit2d5c06afc92328e4f63ccfde192c7bc786d543fc (patch)
tree19edc2ceb34acaffe61a05b8a9df22d1d7cd14f4
parent066ccfb7e76b8d63215265b23efcb29ec270f5fe (diff)
downloadqpid-python-2d5c06afc92328e4f63ccfde192c7bc786d543fc.tar.gz
ConsumerImpl optimization - only dispatch on queue if we were notified of messages.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@778443 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp20
-rw-r--r--cpp/src/qpid/broker/SemanticState.h3
2 files changed, 21 insertions, 2 deletions
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index 3ba76f656e..22908afd8e 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -257,6 +257,7 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
msgCredit(0),
byteCredit(0),
notifyEnabled(true),
+ queueHasMessages(true),
syncFrequency(_arguments.getAsInt("qpid.sync_frequency")),
deliveryCount(0) {}
@@ -524,7 +525,7 @@ void SemanticState::ConsumerImpl::addMessageCredit(uint32_t value)
bool SemanticState::ConsumerImpl::haveCredit()
{
- if (msgCredit) {
+ if (msgCredit && byteCredit) {
return true;
} else {
blocked = true;
@@ -592,7 +593,18 @@ bool SemanticState::ConsumerImpl::hasOutput() {
bool SemanticState::ConsumerImpl::doOutput()
{
- return haveCredit() && queue->dispatch(shared_from_this());
+ {
+ Mutex::ScopedLock l(lock);
+ if (!haveCredit() || !queueHasMessages) return false;
+ queueHasMessages = false;
+ }
+ bool moreMessages = queue->dispatch(shared_from_this());
+ {
+ Mutex::ScopedLock l(lock);
+ // queueHasMessages may have been set by a notify() during dispatch()
+ queueHasMessages = queueHasMessages || moreMessages;
+ }
+ return queueHasMessages;
}
void SemanticState::ConsumerImpl::enableNotify()
@@ -614,6 +626,10 @@ bool SemanticState::ConsumerImpl::isNotifyEnabled() const {
void SemanticState::ConsumerImpl::notify()
{
+ {
+ Mutex::ScopedLock l(lock);
+ queueHasMessages = true;
+ }
//TODO: alter this, don't want to hold locks across external
//calls; for now its is required to protect the notify() from
//having part of the object chain of the invocation being
diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h
index 35f8b4392f..8237f22e3f 100644
--- a/cpp/src/qpid/broker/SemanticState.h
+++ b/cpp/src/qpid/broker/SemanticState.h
@@ -36,6 +36,7 @@
#include "qpid/framing/Uuid.h"
#include "qpid/sys/AggregateOutput.h"
#include "qpid/sys/Mutex.h"
+#include "qpid/sys/AtomicValue.h"
#include "AclModule.h"
#include <list>
@@ -76,6 +77,8 @@ class SemanticState : public sys::OutputTask,
uint32_t msgCredit;
uint32_t byteCredit;
bool notifyEnabled;
+ // sys::AtomicValue<bool> queueHasMessages;
+ bool queueHasMessages;
const int syncFrequency;
int deliveryCount;