From 95e371124817b222fc53df491469df0e8503d24c Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Tue, 17 Apr 2012 15:12:15 +0000 Subject: QPID-3950: Allow browsing of queues with exclusive subscriptions git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1327135 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/broker/Queue.cpp | 28 ++++++++++++++++++++-------- qpid/cpp/src/qpid/broker/Queue.h | 3 ++- qpid/cpp/src/qpid/broker/SessionAdapter.cpp | 1 + qpid/cpp/src/tests/ha_tests.py | 16 +++++++++++++++- 4 files changed, 38 insertions(+), 10 deletions(-) diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index a223392e2e..4f114fa0a7 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -154,6 +154,7 @@ Queue::Queue(const string& _name, bool _autodelete, store(_store), owner(_owner), consumerCount(0), + browserCount(0), exclusive(0), noLocal(false), persistLastNode(false), @@ -523,17 +524,27 @@ void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){ assertClusterSafe(); { Mutex::ScopedLock locker(messageLock); - if(exclusive) { - throw ResourceLockedException( - QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed.")); - } else if(requestExclusive) { - if(consumerCount) { + // NOTE: consumerCount is actually a count of all + // subscriptions, both acquiring and non-acquiring (browsers). + // Check for exclusivity of acquiring consumers. + size_t acquiringConsumers = consumerCount - browserCount; + if (c->preAcquires()) { + if(exclusive) { throw ResourceLockedException( - QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied.")); - } else { - exclusive = c->getSession(); + QPID_MSG("Queue " << getName() + << " has an exclusive consumer. No more consumers allowed.")); + } else if(requestExclusive) { + if(acquiringConsumers) { + throw ResourceLockedException( + QPID_MSG("Queue " << getName() + << " already has consumers. Exclusive access denied.")); + } else { + exclusive = c->getSession(); + } } } + else + browserCount++; consumerCount++; //reset auto deletion timer if necessary if (autoDeleteTimeout && autoDeleteTask) { @@ -550,6 +561,7 @@ void Queue::cancel(Consumer::shared_ptr c){ { Mutex::ScopedLock locker(messageLock); consumerCount--; + if (!c->preAcquires()) browserCount--; if(exclusive) exclusive = 0; observeConsumerRemove(*c, locker); } diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index 9b9acc677c..9869a698c1 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -97,7 +97,8 @@ class Queue : public boost::enable_shared_from_this, const bool autodelete; MessageStore* store; const OwnershipToken* owner; - uint32_t consumerCount; + uint32_t consumerCount; // Actually a count of all subscriptions, acquiring or not. + uint32_t browserCount; // Count of non-acquiring subscriptions. OwnershipToken* exclusive; bool noLocal; bool persistLastNode; diff --git a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp index 7c689d56fb..69fba58353 100644 --- a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp @@ -410,6 +410,7 @@ SessionAdapter::MessageHandlerImpl::subscribe(const string& queueName, if(!destination.empty() && state.exists(destination)) throw NotAllowedException(QPID_MSG("Consumer tags must be unique")); + // We allow browsing (acquireMode == 1) of exclusive queues, this is required by HA. if (queue->hasExclusiveOwner() && !queue->isExclusiveOwner(&session) && acquireMode == 0) throw ResourceLockedException(QPID_MSG("Cannot subscribe to exclusive queue " << queue->getName())); diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index 740b695278..e251e4d8c8 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -19,7 +19,7 @@ # import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math -from qpid.messaging import Message, NotFound, ConnectionError, Connection +from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection from qpid.datatypes import uuid4 from brokertest import * from threading import Thread, Lock, Condition @@ -548,6 +548,20 @@ class ReplicationTests(HaTest): c = cluster[0].connect().session().sender("q;{create:always, node:{x-declare:{arguments:{'qpid.replicate':XXinvalidXX}}}}") self.wait_backup(cluster[1], "q") + def test_exclusive_queue(self): + """Ensure that we can back-up exclusive queues, i.e. the replicating + subscriptions are exempt from the exclusivity""" + cluster = HaCluster(self, 2) + def test(addr): + c = cluster[0].connect() + q = addr.split(";")[0] + r = c.session().receiver(addr) + try: c.session().receiver(addr); self.fail("Expected exclusive exception") + except ReceiverError: pass + s = c.session().sender(q).send(q) + self.assert_browse_backup(cluster[1], q, [q]) + test("excl_sub;{create:always, link:{x-subscribe:{exclusive:True}}}"); + test("excl_queue;{create:always, node:{x-declare:{exclusive:True}}}") def fairshare(msgs, limit, levels): """ -- cgit v1.2.1