summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-09-17 15:52:07 +0000
committerAlan Conway <aconway@apache.org>2012-09-17 15:52:07 +0000
commite0ae72a0449e026b5a15062de8f3252b62271b96 (patch)
treebca75b95bfd28b4e34f7918290d3002b0712e193 /cpp/src
parent68b49842c625d7335ba81e59dcda6a691f3b07fe (diff)
downloadqpid-python-e0ae72a0449e026b5a15062de8f3252b62271b96.tar.gz
QPID-4290: HA auto-delete queues are not deleted (Author: Andy Goldstein)
ReplicatingSubscription was being counted as a consumer and preventing auto-delete queues from being deleted. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1386672 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/broker/Consumer.h7
-rw-r--r--cpp/src/qpid/broker/Queue.cpp28
-rw-r--r--cpp/src/qpid/ha/ReplicatingSubscription.h2
-rwxr-xr-xcpp/src/tests/ha_tests.py30
4 files changed, 45 insertions, 22 deletions
diff --git a/cpp/src/qpid/broker/Consumer.h b/cpp/src/qpid/broker/Consumer.h
index 64fc4288af..d21dbb19f0 100644
--- a/cpp/src/qpid/broker/Consumer.h
+++ b/cpp/src/qpid/broker/Consumer.h
@@ -79,6 +79,13 @@ class Consumer : public QueueCursor {
*/
virtual bool hideDeletedError() { return false; }
+ /** If false, the consumer is not counted for purposes of auto-deletion or
+ * immediate messages. This is used for "system" consumers that are created
+ * by the broker for internal purposes as opposed to consumers that are
+ * created by normal clients.
+ */
+ virtual bool isCounted() { return true; }
+
protected:
//framing::SequenceNumber position;
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 8a3847d68c..9be3a1acac 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -501,22 +501,29 @@ void Queue::consume(Consumer::shared_ptr c, bool requestExclusive)
}
}
}
- else
+ else if(c->isCounted()) {
browserCount++;
- consumerCount++;
- //reset auto deletion timer if necessary
- if (settings.autoDeleteDelay && autoDeleteTask) {
- autoDeleteTask->cancel();
}
- observeConsumerAdd(*c, locker);
+ if(c->isCounted()) {
+ consumerCount++;
+
+ //reset auto deletion timer if necessary
+ if (settings.autoDeleteDelay && autoDeleteTask) {
+ autoDeleteTask->cancel();
+ }
+
+ observeConsumerAdd(*c, locker);
+ }
+ }
+ if (mgmtObject != 0 && c->isCounted()) {
+ mgmtObject->inc_consumerCount();
}
- if (mgmtObject != 0)
- mgmtObject->inc_consumerCount ();
}
void Queue::cancel(Consumer::shared_ptr c)
{
removeListener(c);
+ if(c->isCounted())
{
Mutex::ScopedLock locker(messageLock);
consumerCount--;
@@ -524,8 +531,9 @@ void Queue::cancel(Consumer::shared_ptr c)
if(exclusive) exclusive = 0;
observeConsumerRemove(*c, locker);
}
- if (mgmtObject != 0)
- mgmtObject->dec_consumerCount ();
+ if (mgmtObject != 0 && c->isCounted()) {
+ mgmtObject->dec_consumerCount();
+ }
}
/**
diff --git a/cpp/src/qpid/ha/ReplicatingSubscription.h b/cpp/src/qpid/ha/ReplicatingSubscription.h
index c8e2c4f457..4f54ffce96 100644
--- a/cpp/src/qpid/ha/ReplicatingSubscription.h
+++ b/cpp/src/qpid/ha/ReplicatingSubscription.h
@@ -114,6 +114,8 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl
// Hide the "queue deleted" error for a ReplicatingSubscription when a
// queue is deleted, this is normal and not an error.
bool hideDeletedError() { return true; }
+ // Not counted for auto deletion and immediate message purposes.
+ bool isCounted() { return false; }
/** Initialization that must be done separately from construction
* because it requires a shared_ptr to this to exist.
diff --git a/cpp/src/tests/ha_tests.py b/cpp/src/tests/ha_tests.py
index 92442b465a..e32077f219 100755
--- a/cpp/src/tests/ha_tests.py
+++ b/cpp/src/tests/ha_tests.py
@@ -677,20 +677,26 @@ class ReplicationTests(BrokerTest):
def test_auto_delete_exclusive(self):
"""Verify that we ignore auto-delete, exclusive, non-auto-delete-timeout queues"""
- cluster = HaCluster(self,2)
- s = cluster[0].connect().session()
- s.receiver("exad;{create:always,node:{x-declare:{exclusive:True,auto-delete:True}}}")
- s.receiver("ex;{create:always,node:{x-declare:{exclusive:True}}}")
- s.receiver("ad;{create:always,node:{x-declare:{auto-delete:True}}}")
- s.receiver("time;{create:always,node:{x-declare:{exclusive:True,auto-delete:True,arguments:{'qpid.auto_delete_timeout':1}}}}")
- s.receiver("q;{create:always}")
+ cluster = HaCluster(self, 2)
+ s0 = cluster[0].connect().session()
+ s0.receiver("exad;{create:always,node:{x-declare:{exclusive:True,auto-delete:True}}}")
+ s0.receiver("ex;{create:always,node:{x-declare:{exclusive:True}}}")
+ ad = s0.receiver("ad;{create:always,node:{x-declare:{auto-delete:True}}}")
+ s0.receiver("time;{create:always,node:{x-declare:{exclusive:True,auto-delete:True,arguments:{'qpid.auto_delete_timeout':1}}}}")
+ s0.receiver("q;{create:always}")
- s = cluster[1].connect_admin().session()
+ s1 = cluster[1].connect_admin().session()
cluster[1].wait_backup("q")
- assert not valid_address(s, "exad")
- assert valid_address(s, "ex")
- assert valid_address(s, "ad")
- assert valid_address(s, "time")
+ assert not valid_address(s1, "exad")
+ assert valid_address(s1, "ex")
+ assert valid_address(s1, "ad")
+ assert valid_address(s1, "time")
+
+ # Verify that auto-delete queues are not kept alive by
+ # replicating subscriptions
+ ad.close()
+ s0.sync()
+ assert not valid_address(s0, "ad")
def test_broker_info(self):
"""Check that broker information is correctly published via management"""