summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-04-17 15:12:15 +0000
committerAlan Conway <aconway@apache.org>2012-04-17 15:12:15 +0000
commit95e371124817b222fc53df491469df0e8503d24c (patch)
tree4b50fdeffc3bb75e1b0ad2ba6bfa7fe76b019020
parentc8d206abb661ade5e113ba7950e5d8b90a0b29ef (diff)
downloadqpid-python-95e371124817b222fc53df491469df0e8503d24c.tar.gz
QPID-3950: Allow browsing of queues with exclusive subscriptions
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1327135 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp28
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.h3
-rw-r--r--qpid/cpp/src/qpid/broker/SessionAdapter.cpp1
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py16
4 files changed, 38 insertions, 10 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index a223392e2e..4f114fa0a7 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -154,6 +154,7 @@ Queue::Queue(const string& _name, bool _autodelete,
store(_store),
owner(_owner),
consumerCount(0),
+ browserCount(0),
exclusive(0),
noLocal(false),
persistLastNode(false),
@@ -523,17 +524,27 @@ void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){
assertClusterSafe();
{
Mutex::ScopedLock locker(messageLock);
- if(exclusive) {
- throw ResourceLockedException(
- QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed."));
- } else if(requestExclusive) {
- if(consumerCount) {
+ // NOTE: consumerCount is actually a count of all
+ // subscriptions, both acquiring and non-acquiring (browsers).
+ // Check for exclusivity of acquiring consumers.
+ size_t acquiringConsumers = consumerCount - browserCount;
+ if (c->preAcquires()) {
+ if(exclusive) {
throw ResourceLockedException(
- QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied."));
- } else {
- exclusive = c->getSession();
+ QPID_MSG("Queue " << getName()
+ << " has an exclusive consumer. No more consumers allowed."));
+ } else if(requestExclusive) {
+ if(acquiringConsumers) {
+ throw ResourceLockedException(
+ QPID_MSG("Queue " << getName()
+ << " already has consumers. Exclusive access denied."));
+ } else {
+ exclusive = c->getSession();
+ }
}
}
+ else
+ browserCount++;
consumerCount++;
//reset auto deletion timer if necessary
if (autoDeleteTimeout && autoDeleteTask) {
@@ -550,6 +561,7 @@ void Queue::cancel(Consumer::shared_ptr c){
{
Mutex::ScopedLock locker(messageLock);
consumerCount--;
+ if (!c->preAcquires()) browserCount--;
if(exclusive) exclusive = 0;
observeConsumerRemove(*c, locker);
}
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index 9b9acc677c..9869a698c1 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -97,7 +97,8 @@ class Queue : public boost::enable_shared_from_this<Queue>,
const bool autodelete;
MessageStore* store;
const OwnershipToken* owner;
- uint32_t consumerCount;
+ uint32_t consumerCount; // Actually a count of all subscriptions, acquiring or not.
+ uint32_t browserCount; // Count of non-acquiring subscriptions.
OwnershipToken* exclusive;
bool noLocal;
bool persistLastNode;
diff --git a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
index 7c689d56fb..69fba58353 100644
--- a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
@@ -410,6 +410,7 @@ SessionAdapter::MessageHandlerImpl::subscribe(const string& queueName,
if(!destination.empty() && state.exists(destination))
throw NotAllowedException(QPID_MSG("Consumer tags must be unique"));
+ // We allow browsing (acquireMode == 1) of exclusive queues, this is required by HA.
if (queue->hasExclusiveOwner() && !queue->isExclusiveOwner(&session) && acquireMode == 0)
throw ResourceLockedException(QPID_MSG("Cannot subscribe to exclusive queue "
<< queue->getName()));
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index 740b695278..e251e4d8c8 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -19,7 +19,7 @@
#
import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math
-from qpid.messaging import Message, NotFound, ConnectionError, Connection
+from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection
from qpid.datatypes import uuid4
from brokertest import *
from threading import Thread, Lock, Condition
@@ -548,6 +548,20 @@ class ReplicationTests(HaTest):
c = cluster[0].connect().session().sender("q;{create:always, node:{x-declare:{arguments:{'qpid.replicate':XXinvalidXX}}}}")
self.wait_backup(cluster[1], "q")
+ def test_exclusive_queue(self):
+ """Ensure that we can back-up exclusive queues, i.e. the replicating
+ subscriptions are exempt from the exclusivity"""
+ cluster = HaCluster(self, 2)
+ def test(addr):
+ c = cluster[0].connect()
+ q = addr.split(";")[0]
+ r = c.session().receiver(addr)
+ try: c.session().receiver(addr); self.fail("Expected exclusive exception")
+ except ReceiverError: pass
+ s = c.session().sender(q).send(q)
+ self.assert_browse_backup(cluster[1], q, [q])
+ test("excl_sub;{create:always, link:{x-subscribe:{exclusive:True}}}");
+ test("excl_queue;{create:always, node:{x-declare:{exclusive:True}}}")
def fairshare(msgs, limit, levels):
"""