summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Queue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp103
1 files changed, 82 insertions, 21 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index df34669dc2..2444684d7e 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -151,15 +151,33 @@ void Queue::flush(DispatchCompletion& completion)
serializer.execute(f);
}
+/**
+ * Return true if the message can be excluded. This is currently the
+ * case if the queue has an exclusive consumer that will never want
+ * the message, or if the queue is exclusive to a single connection
+ * and has a single consumer (covers the JMS topic case).
+ */
+bool Queue::exclude(Message::shared_ptr msg)
+{
+ RWlock::ScopedWlock locker(consumerLock);
+ if (exclusive) {
+ return !exclusive->filter(msg);
+ } else if (hasExclusiveOwner() && acquirers.size() == 1) {
+ return !acquirers[0]->filter(msg);
+ } else {
+ return false;
+ }
+}
+
Consumer::ptr Queue::allocate()
{
RWlock::ScopedWlock locker(consumerLock);
- if(acquirers.empty()){
+ if (acquirers.empty()) {
return Consumer::ptr();
- }else if(exclusive){
+ } else if (exclusive){
return exclusive;
- }else{
+ } else {
next = next % acquirers.size();
return acquirers[next++];
}
@@ -171,9 +189,9 @@ bool Queue::dispatch(QueuedMessage& msg)
//request, so won't result in anyone being missed
uint counter = getAcquirerCount();
Consumer::ptr c = allocate();
- while(c && counter--){
- if(c->deliver(msg)) {
- return true;
+ while (c && counter--){
+ if (c->deliver(msg)) {
+ return true;
} else {
c = allocate();
}
@@ -181,22 +199,31 @@ bool Queue::dispatch(QueuedMessage& msg)
return false;
}
-void Queue::dispatch(){
+bool Queue::getNextMessage(QueuedMessage& msg)
+{
+ Mutex::ScopedLock locker(messageLock);
+ if (messages.empty()) {
+ QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
+ return false;
+ } else {
+ msg = messages.front();
+ return true;
+ }
+}
+
+void Queue::dispatch()
+{
QueuedMessage msg;
- while(true){
- {
- Mutex::ScopedLock locker(messageLock);
- if (messages.empty()) {
- QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
- break;
- }
- msg = messages.front();
- }
- if( msg.payload->isEnqueueComplete() && dispatch(msg) ) {
- pop();
- } else {
- break;
- }
+ while (getNextMessage(msg) && msg.payload->isEnqueueComplete()){
+ if (dispatch(msg)) {
+ pop();
+ } else if (exclude(msg.payload)) {
+ pop();
+ dequeue(0, msg.payload);
+ QPID_LOG(debug, "Message " << msg.payload << " filtered out of " << name << "[" << this << "]");
+ } else {
+ break;
+ }
}
serviceAllBrowsers();
}
@@ -479,3 +506,37 @@ void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue)
}
}
+
+bool Queue::isExclusiveOwner(const ConnectionToken* const o) const
+{
+ Mutex::ScopedLock locker(ownershipLock);
+ return o == owner;
+}
+
+void Queue::releaseExclusiveOwnership()
+{
+ Mutex::ScopedLock locker(ownershipLock);
+ owner = 0;
+}
+
+bool Queue::setExclusiveOwner(const ConnectionToken* const o)
+{
+ Mutex::ScopedLock locker(ownershipLock);
+ if (owner) {
+ return false;
+ } else {
+ owner = o;
+ return true;
+ }
+}
+
+bool Queue::hasExclusiveOwner() const
+{
+ Mutex::ScopedLock locker(ownershipLock);
+ return owner != 0;
+}
+
+bool Queue::hasExclusiveConsumer() const
+{
+ return exclusive;
+}