diff options
Diffstat (limited to 'cpp/src/tests')
-rw-r--r-- | cpp/src/tests/brokertest.py | 4 | ||||
-rwxr-xr-x | cpp/src/tests/ha_test.py | 21 | ||||
-rwxr-xr-x | cpp/src/tests/ha_tests.py | 72 |
3 files changed, 88 insertions, 9 deletions
diff --git a/cpp/src/tests/brokertest.py b/cpp/src/tests/brokertest.py index 0ab0d13424..0597a933a3 100644 --- a/cpp/src/tests/brokertest.py +++ b/cpp/src/tests/brokertest.py @@ -203,8 +203,8 @@ class Popen(subprocess.Popen): self.wait() def kill(self): - try: - subprocess.Popen.kill(self) + self.expect = EXPECT_EXIT_FAIL + try: subprocess.Popen.kill(self) except AttributeError: # No terminate method try: os.kill( self.pid , signal.SIGKILL) diff --git a/cpp/src/tests/ha_test.py b/cpp/src/tests/ha_test.py index 462c90bfb3..5cf28f6ef9 100755 --- a/cpp/src/tests/ha_test.py +++ b/cpp/src/tests/ha_test.py @@ -129,6 +129,16 @@ class HaBroker(Broker): assert retry(try_get_status, timeout=20), "%s expected=%r, actual=%r"%( self, status, self._status) + def wait_queue(self, queue, timeout=1): + """ Wait for queue to be visible via QMF""" + agent = self.agent() + assert retry(lambda: agent.getQueue(queue) is not None, timeout=timeout) + + def wait_no_queue(self, queue, timeout=1): + """ Wait for queue to be invisible via QMF""" + agent = self.agent() + assert retry(lambda: agent.getQueue(queue) is None, timeout=timeout) + # FIXME aconway 2012-05-01: do direct python call to qpid-config code. def qpid_config(self, args): assert subprocess.call( @@ -185,6 +195,9 @@ class HaBroker(Broker): def ready(self): return Broker.ready(self, client_properties={"qpid.ha-admin":1}) + def kill(self): + self._agent = None + return Broker.kill(self) class HaCluster(object): _cluster_count = 0 @@ -225,7 +238,6 @@ class HaCluster(object): def kill(self, i, promote_next=True): """Kill broker i, promote broker i+1""" - self[i].expect = EXPECT_EXIT_FAIL self[i].kill() if promote_next: self[(i+1) % len(self)].promote() @@ -254,12 +266,11 @@ class HaCluster(object): def __getitem__(self,index): return self._brokers[index] def __iter__(self): return self._brokers.__iter__() + def wait_address(session, address): """Wait for an address to become valid.""" def check(): - try: - session.sender(address) - return True + try: session.sender(address); return True except NotFound: return False assert retry(check), "Timed out waiting for address %s"%(address) @@ -269,3 +280,5 @@ def valid_address(session, address): session.receiver(address) return True except NotFound: return False + + diff --git a/cpp/src/tests/ha_tests.py b/cpp/src/tests/ha_tests.py index 7ce0d1701a..63fd48b66c 100755 --- a/cpp/src/tests/ha_tests.py +++ b/cpp/src/tests/ha_tests.py @@ -652,9 +652,9 @@ acl deny all all self.assertRaises(NotFound, s.receiver, ("e2")); - def test_auto_delete_qpid_4285(self): - """Regression test for QPID-4285: an auto delete queue gets stuck in - a partially deleted state and causes replication errors.""" + def test_delete_qpid_4285(self): + """Regression test for QPID-4285: on deleting a queue it gets stuck in a + partially deleted state and causes replication errors.""" cluster = HaCluster(self,2) cluster[1].wait_status("ready") s = cluster[0].connect().session() @@ -669,6 +669,72 @@ acl deny all all except NotFound: pass assert not cluster[1].agent().getQueue("q") # Should not be in QMF + def alt_setup(self, session, suffix): + # Create exchange to use as alternate and a queue bound to it. + # altex exchange: acts as alternate exchange + session.sender("altex%s;{create:always,node:{type:topic,x-declare:{type:'fanout'}}}"%(suffix)) + # altq queue bound to altex, collect re-routed messages. + session.sender("altq%s;{create:always,node:{x-bindings:[{exchange:'altex%s',queue:altq%s}]}}"%(suffix,suffix,suffix)) + + def test_auto_delete_close(self): + """Verify auto-delete queues are deleted on backup if auto-deleted + on primary""" + cluster=HaCluster(self, 2) + p = cluster[0].connect().session() + self.alt_setup(p, "1") + r = p.receiver("adq1;{create:always,node:{x-declare:{auto-delete:True,alternate-exchange:'altex1'}}}", capacity=1) + s = p.sender("adq1") + for m in ["aa","bb","cc"]: s.send(m) + p.sender("adq2;{create:always,node:{x-declare:{auto-delete:True}}}") + cluster[1].wait_queue("adq1") + cluster[1].wait_queue("adq2") + r.close() # trigger auto-delete of adq1 + cluster[1].wait_no_queue("adq1") + cluster[1].assert_browse_backup("altq1", ["aa","bb","cc"]) + cluster[1].wait_queue("adq2") + + def test_auto_delete_crash(self): + """Verify auto-delete queues are deleted on backup if the primary crashes""" + cluster=HaCluster(self, 2) + p = cluster[0].connect().session() + self.alt_setup(p,"1") + + # adq1 is subscribed so will be auto-deleted. + r = p.receiver("adq1;{create:always,node:{x-declare:{auto-delete:True,alternate-exchange:'altex1'}}}", capacity=1) + s = p.sender("adq1") + for m in ["aa","bb","cc"]: s.send(m) + # adq2 is subscribed after cluster[2] starts. + p.sender("adq2;{create:always,node:{x-declare:{auto-delete:True}}}") + # adq3 is never subscribed. + p.sender("adq3;{create:always,node:{x-declare:{auto-delete:True}}}") + + cluster.start() + cluster[2].wait_status("ready") + + p.receiver("adq2") # Subscribed after cluster[2] joined + + for q in ["adq1","adq2","adq3","altq1"]: cluster[1].wait_queue(q) + for q in ["adq1","adq2","adq3","altq1"]: cluster[2].wait_queue(q) + cluster[0].kill() + + cluster[1].wait_no_queue("adq1") + cluster[1].wait_no_queue("adq2") + cluster[1].wait_queue("adq3") + + cluster[2].wait_no_queue("adq1") + cluster[2].wait_no_queue("adq2") + cluster[2].wait_queue("adq3") + + cluster[1].assert_browse_backup("altq1", ["aa","bb","cc"]) + cluster[2].assert_browse_backup("altq1", ["aa","bb","cc"]) + + def test_auto_delete_timeout(self): + cluster = HaCluster(self, 2) + s = cluster[0].connect().session().receiver("q;{create:always,node:{x-declare:{auto-delete:True,arguments:{'qpid.auto_delete_timeout':1}}}}") + cluster[1].wait_queue("q") + cluster[0].kill() + cluster[1].wait_queue("q") # Not timed out yet + cluster[1].wait_no_queue("q") # Wait for timeout def fairshare(msgs, limit, levels): """ |