summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/Queue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/Queue.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp32
1 files changed, 26 insertions, 6 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index f3cdc03f7d..c9ee7f394f 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -256,10 +256,30 @@ bool Queue::acquire(const QueuedMessage& msg) {
return false;
}
+void Queue::notifyListener()
+{
+ QueueListeners::NotificationSet set;
+ {
+ Mutex::ScopedLock locker(messageLock);
+ if (messages.size()) {
+ listeners.populate(set);
+ }
+ }
+ set.notify();
+}
+
bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
{
if (c->preAcquires()) {
- return consumeNextMessage(m, c);
+ switch (consumeNextMessage(m, c)) {
+ case CONSUMED:
+ return true;
+ case CANT_CONSUME:
+ notifyListener();//let someone else try
+ case NO_MESSAGES:
+ default:
+ return false;
+ }
} else {
return browseNextMessage(m, c);
}
@@ -291,14 +311,14 @@ bool Queue::checkForMessages(Consumer::shared_ptr c)
}
}
-bool Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
+Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
{
while (true) {
Mutex::ScopedLock locker(messageLock);
if (messages.empty()) {
QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
listeners.addListener(c);
- return false;
+ return NO_MESSAGES;
} else {
QueuedMessage msg = getFront();
if (msg.payload->hasExpired()) {
@@ -311,16 +331,16 @@ bool Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
if (c->accept(msg.payload)) {
m = msg;
popMsg(msg);
- return true;
+ return CONSUMED;
} else {
//message(s) are available but consumer hasn't got enough credit
QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'");
- return false;
+ return CANT_CONSUME;
}
} else {
//consumer will never want this message
QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'");
- return false;
+ return CANT_CONSUME;
}
}
}