summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/SemanticState.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/SemanticState.cpp')
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp20
1 files changed, 18 insertions, 2 deletions
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index 3ba76f656e..22908afd8e 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -257,6 +257,7 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
msgCredit(0),
byteCredit(0),
notifyEnabled(true),
+ queueHasMessages(true),
syncFrequency(_arguments.getAsInt("qpid.sync_frequency")),
deliveryCount(0) {}
@@ -524,7 +525,7 @@ void SemanticState::ConsumerImpl::addMessageCredit(uint32_t value)
bool SemanticState::ConsumerImpl::haveCredit()
{
- if (msgCredit) {
+ if (msgCredit && byteCredit) {
return true;
} else {
blocked = true;
@@ -592,7 +593,18 @@ bool SemanticState::ConsumerImpl::hasOutput() {
bool SemanticState::ConsumerImpl::doOutput()
{
- return haveCredit() && queue->dispatch(shared_from_this());
+ {
+ Mutex::ScopedLock l(lock);
+ if (!haveCredit() || !queueHasMessages) return false;
+ queueHasMessages = false;
+ }
+ bool moreMessages = queue->dispatch(shared_from_this());
+ {
+ Mutex::ScopedLock l(lock);
+ // queueHasMessages may have been set by a notify() during dispatch()
+ queueHasMessages = queueHasMessages || moreMessages;
+ }
+ return queueHasMessages;
}
void SemanticState::ConsumerImpl::enableNotify()
@@ -614,6 +626,10 @@ bool SemanticState::ConsumerImpl::isNotifyEnabled() const {
void SemanticState::ConsumerImpl::notify()
{
+ {
+ Mutex::ScopedLock l(lock);
+ queueHasMessages = true;
+ }
//TODO: alter this, don't want to hold locks across external
//calls; for now its is required to protect the notify() from
//having part of the object chain of the invocation being