diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/Queue.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 32 |
1 files changed, 26 insertions, 6 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index f3cdc03f7d..c9ee7f394f 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -256,10 +256,30 @@ bool Queue::acquire(const QueuedMessage& msg) { return false; } +void Queue::notifyListener() +{ + QueueListeners::NotificationSet set; + { + Mutex::ScopedLock locker(messageLock); + if (messages.size()) { + listeners.populate(set); + } + } + set.notify(); +} + bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr c) { if (c->preAcquires()) { - return consumeNextMessage(m, c); + switch (consumeNextMessage(m, c)) { + case CONSUMED: + return true; + case CANT_CONSUME: + notifyListener();//let someone else try + case NO_MESSAGES: + default: + return false; + } } else { return browseNextMessage(m, c); } @@ -291,14 +311,14 @@ bool Queue::checkForMessages(Consumer::shared_ptr c) } } -bool Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c) +Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c) { while (true) { Mutex::ScopedLock locker(messageLock); if (messages.empty()) { QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'"); listeners.addListener(c); - return false; + return NO_MESSAGES; } else { QueuedMessage msg = getFront(); if (msg.payload->hasExpired()) { @@ -311,16 +331,16 @@ bool Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c) if (c->accept(msg.payload)) { m = msg; popMsg(msg); - return true; + return CONSUMED; } else { //message(s) are available but consumer hasn't got enough credit QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'"); - return false; + return CANT_CONSUME; } } else { //consumer will never want this message QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'"); - return false; + return CANT_CONSUME; } } } |