summaryrefslogtreecommitdiff
path: root/cpp/src/tests
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/tests')
-rw-r--r--cpp/src/tests/brokertest.py4
-rwxr-xr-xcpp/src/tests/ha_test.py21
-rwxr-xr-xcpp/src/tests/ha_tests.py72
3 files changed, 88 insertions, 9 deletions
diff --git a/cpp/src/tests/brokertest.py b/cpp/src/tests/brokertest.py
index 0ab0d13424..0597a933a3 100644
--- a/cpp/src/tests/brokertest.py
+++ b/cpp/src/tests/brokertest.py
@@ -203,8 +203,8 @@ class Popen(subprocess.Popen):
self.wait()
def kill(self):
- try:
- subprocess.Popen.kill(self)
+ self.expect = EXPECT_EXIT_FAIL
+ try: subprocess.Popen.kill(self)
except AttributeError: # No terminate method
try:
os.kill( self.pid , signal.SIGKILL)
diff --git a/cpp/src/tests/ha_test.py b/cpp/src/tests/ha_test.py
index 462c90bfb3..5cf28f6ef9 100755
--- a/cpp/src/tests/ha_test.py
+++ b/cpp/src/tests/ha_test.py
@@ -129,6 +129,16 @@ class HaBroker(Broker):
assert retry(try_get_status, timeout=20), "%s expected=%r, actual=%r"%(
self, status, self._status)
+ def wait_queue(self, queue, timeout=1):
+ """ Wait for queue to be visible via QMF"""
+ agent = self.agent()
+ assert retry(lambda: agent.getQueue(queue) is not None, timeout=timeout)
+
+ def wait_no_queue(self, queue, timeout=1):
+ """ Wait for queue to be invisible via QMF"""
+ agent = self.agent()
+ assert retry(lambda: agent.getQueue(queue) is None, timeout=timeout)
+
# FIXME aconway 2012-05-01: do direct python call to qpid-config code.
def qpid_config(self, args):
assert subprocess.call(
@@ -185,6 +195,9 @@ class HaBroker(Broker):
def ready(self):
return Broker.ready(self, client_properties={"qpid.ha-admin":1})
+ def kill(self):
+ self._agent = None
+ return Broker.kill(self)
class HaCluster(object):
_cluster_count = 0
@@ -225,7 +238,6 @@ class HaCluster(object):
def kill(self, i, promote_next=True):
"""Kill broker i, promote broker i+1"""
- self[i].expect = EXPECT_EXIT_FAIL
self[i].kill()
if promote_next: self[(i+1) % len(self)].promote()
@@ -254,12 +266,11 @@ class HaCluster(object):
def __getitem__(self,index): return self._brokers[index]
def __iter__(self): return self._brokers.__iter__()
+
def wait_address(session, address):
"""Wait for an address to become valid."""
def check():
- try:
- session.sender(address)
- return True
+ try: session.sender(address); return True
except NotFound: return False
assert retry(check), "Timed out waiting for address %s"%(address)
@@ -269,3 +280,5 @@ def valid_address(session, address):
session.receiver(address)
return True
except NotFound: return False
+
+
diff --git a/cpp/src/tests/ha_tests.py b/cpp/src/tests/ha_tests.py
index 7ce0d1701a..63fd48b66c 100755
--- a/cpp/src/tests/ha_tests.py
+++ b/cpp/src/tests/ha_tests.py
@@ -652,9 +652,9 @@ acl deny all all
self.assertRaises(NotFound, s.receiver, ("e2"));
- def test_auto_delete_qpid_4285(self):
- """Regression test for QPID-4285: an auto delete queue gets stuck in
- a partially deleted state and causes replication errors."""
+ def test_delete_qpid_4285(self):
+ """Regression test for QPID-4285: on deleting a queue it gets stuck in a
+ partially deleted state and causes replication errors."""
cluster = HaCluster(self,2)
cluster[1].wait_status("ready")
s = cluster[0].connect().session()
@@ -669,6 +669,72 @@ acl deny all all
except NotFound: pass
assert not cluster[1].agent().getQueue("q") # Should not be in QMF
+ def alt_setup(self, session, suffix):
+ # Create exchange to use as alternate and a queue bound to it.
+ # altex exchange: acts as alternate exchange
+ session.sender("altex%s;{create:always,node:{type:topic,x-declare:{type:'fanout'}}}"%(suffix))
+ # altq queue bound to altex, collect re-routed messages.
+ session.sender("altq%s;{create:always,node:{x-bindings:[{exchange:'altex%s',queue:altq%s}]}}"%(suffix,suffix,suffix))
+
+ def test_auto_delete_close(self):
+ """Verify auto-delete queues are deleted on backup if auto-deleted
+ on primary"""
+ cluster=HaCluster(self, 2)
+ p = cluster[0].connect().session()
+ self.alt_setup(p, "1")
+ r = p.receiver("adq1;{create:always,node:{x-declare:{auto-delete:True,alternate-exchange:'altex1'}}}", capacity=1)
+ s = p.sender("adq1")
+ for m in ["aa","bb","cc"]: s.send(m)
+ p.sender("adq2;{create:always,node:{x-declare:{auto-delete:True}}}")
+ cluster[1].wait_queue("adq1")
+ cluster[1].wait_queue("adq2")
+ r.close() # trigger auto-delete of adq1
+ cluster[1].wait_no_queue("adq1")
+ cluster[1].assert_browse_backup("altq1", ["aa","bb","cc"])
+ cluster[1].wait_queue("adq2")
+
+ def test_auto_delete_crash(self):
+ """Verify auto-delete queues are deleted on backup if the primary crashes"""
+ cluster=HaCluster(self, 2)
+ p = cluster[0].connect().session()
+ self.alt_setup(p,"1")
+
+ # adq1 is subscribed so will be auto-deleted.
+ r = p.receiver("adq1;{create:always,node:{x-declare:{auto-delete:True,alternate-exchange:'altex1'}}}", capacity=1)
+ s = p.sender("adq1")
+ for m in ["aa","bb","cc"]: s.send(m)
+ # adq2 is subscribed after cluster[2] starts.
+ p.sender("adq2;{create:always,node:{x-declare:{auto-delete:True}}}")
+ # adq3 is never subscribed.
+ p.sender("adq3;{create:always,node:{x-declare:{auto-delete:True}}}")
+
+ cluster.start()
+ cluster[2].wait_status("ready")
+
+ p.receiver("adq2") # Subscribed after cluster[2] joined
+
+ for q in ["adq1","adq2","adq3","altq1"]: cluster[1].wait_queue(q)
+ for q in ["adq1","adq2","adq3","altq1"]: cluster[2].wait_queue(q)
+ cluster[0].kill()
+
+ cluster[1].wait_no_queue("adq1")
+ cluster[1].wait_no_queue("adq2")
+ cluster[1].wait_queue("adq3")
+
+ cluster[2].wait_no_queue("adq1")
+ cluster[2].wait_no_queue("adq2")
+ cluster[2].wait_queue("adq3")
+
+ cluster[1].assert_browse_backup("altq1", ["aa","bb","cc"])
+ cluster[2].assert_browse_backup("altq1", ["aa","bb","cc"])
+
+ def test_auto_delete_timeout(self):
+ cluster = HaCluster(self, 2)
+ s = cluster[0].connect().session().receiver("q;{create:always,node:{x-declare:{auto-delete:True,arguments:{'qpid.auto_delete_timeout':1}}}}")
+ cluster[1].wait_queue("q")
+ cluster[0].kill()
+ cluster[1].wait_queue("q") # Not timed out yet
+ cluster[1].wait_no_queue("q") # Wait for timeout
def fairshare(msgs, limit, levels):
"""