diff options
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 103 |
1 files changed, 82 insertions, 21 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index df34669dc2..2444684d7e 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -151,15 +151,33 @@ void Queue::flush(DispatchCompletion& completion) serializer.execute(f); } +/** + * Return true if the message can be excluded. This is currently the + * case if the queue has an exclusive consumer that will never want + * the message, or if the queue is exclusive to a single connection + * and has a single consumer (covers the JMS topic case). + */ +bool Queue::exclude(Message::shared_ptr msg) +{ + RWlock::ScopedWlock locker(consumerLock); + if (exclusive) { + return !exclusive->filter(msg); + } else if (hasExclusiveOwner() && acquirers.size() == 1) { + return !acquirers[0]->filter(msg); + } else { + return false; + } +} + Consumer::ptr Queue::allocate() { RWlock::ScopedWlock locker(consumerLock); - if(acquirers.empty()){ + if (acquirers.empty()) { return Consumer::ptr(); - }else if(exclusive){ + } else if (exclusive){ return exclusive; - }else{ + } else { next = next % acquirers.size(); return acquirers[next++]; } @@ -171,9 +189,9 @@ bool Queue::dispatch(QueuedMessage& msg) //request, so won't result in anyone being missed uint counter = getAcquirerCount(); Consumer::ptr c = allocate(); - while(c && counter--){ - if(c->deliver(msg)) { - return true; + while (c && counter--){ + if (c->deliver(msg)) { + return true; } else { c = allocate(); } @@ -181,22 +199,31 @@ bool Queue::dispatch(QueuedMessage& msg) return false; } -void Queue::dispatch(){ +bool Queue::getNextMessage(QueuedMessage& msg) +{ + Mutex::ScopedLock locker(messageLock); + if (messages.empty()) { + QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'"); + return false; + } else { + msg = messages.front(); + return true; + } +} + +void Queue::dispatch() +{ QueuedMessage msg; - while(true){ - { - Mutex::ScopedLock locker(messageLock); - if (messages.empty()) { - QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'"); - break; - } - msg = messages.front(); - } - if( msg.payload->isEnqueueComplete() && dispatch(msg) ) { - pop(); - } else { - break; - } + while (getNextMessage(msg) && msg.payload->isEnqueueComplete()){ + if (dispatch(msg)) { + pop(); + } else if (exclude(msg.payload)) { + pop(); + dequeue(0, msg.payload); + QPID_LOG(debug, "Message " << msg.payload << " filtered out of " << name << "[" << this << "]"); + } else { + break; + } } serviceAllBrowsers(); } @@ -479,3 +506,37 @@ void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue) } } + +bool Queue::isExclusiveOwner(const ConnectionToken* const o) const +{ + Mutex::ScopedLock locker(ownershipLock); + return o == owner; +} + +void Queue::releaseExclusiveOwnership() +{ + Mutex::ScopedLock locker(ownershipLock); + owner = 0; +} + +bool Queue::setExclusiveOwner(const ConnectionToken* const o) +{ + Mutex::ScopedLock locker(ownershipLock); + if (owner) { + return false; + } else { + owner = o; + return true; + } +} + +bool Queue::hasExclusiveOwner() const +{ + Mutex::ScopedLock locker(ownershipLock); + return owner != 0; +} + +bool Queue::hasExclusiveConsumer() const +{ + return exclusive; +} |