diff options
Diffstat (limited to 'qpid/cpp/src/tests/cluster_tests.py')
-rwxr-xr-x | qpid/cpp/src/tests/cluster_tests.py | 25 |
1 files changed, 16 insertions, 9 deletions
diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py index 2db2cdd433..cbc3df4a6b 100755 --- a/qpid/cpp/src/tests/cluster_tests.py +++ b/qpid/cpp/src/tests/cluster_tests.py @@ -241,6 +241,7 @@ acl deny all all retry(lambda: find_in_file("brokerLinkUp", qp.outfile("out"))) broker1.ready() broker2.ready() + qr.wait() def test_queue_cleaner(self): """ Regression test to ensure that cleanup of expired messages works correctly """ @@ -310,6 +311,7 @@ acl deny all all client was attached. """ args=["--mgmt-pub-interval=1","--log-enable=trace+:management"] + # First broker will be killed. cluster0 = self.cluster(1, args=args) cluster1 = self.cluster(1, args=args) @@ -345,9 +347,11 @@ acl deny all all # Force a change of elder cluster0.start() + for b in cluster0: b.ready() cluster0[0].expect=EXPECT_EXIT_FAIL # About to die. cluster0[0].kill() time.sleep(2) # Allow a management interval to pass. + for b in cluster0[1:]: b.ready() # Verify logs are consistent cluster_test_logs.verify_logs() @@ -883,16 +887,20 @@ class DtxTests(BrokerTest): t5.send(["1", "2"]) # Accept messages in a transaction before/after join then commit + # Note: Message sent outside transaction, we're testing transactional acceptance. t6 = DtxTestFixture(self, cluster[0], "t6") t6.send(["a","b","c"]) t6.start() self.assertEqual(t6.accept().body, "a"); + t6.verify(sessions, ["b", "c"]) # Accept messages in a transaction before/after join then roll back + # Note: Message sent outside transaction, we're testing transactional acceptance. t7 = DtxTestFixture(self, cluster[0], "t7") t7.send(["a","b","c"]) t7.start() self.assertEqual(t7.accept().body, "a"); + t7.verify(sessions, ["b", "c"]) # Ended, suspended transactions across join. t8 = DtxTestFixture(self, cluster[0], "t8") @@ -948,6 +956,7 @@ class DtxTests(BrokerTest): # Rollback t7 self.assertEqual(t7.accept().body, "b"); + t7.verify(sessions, ["c"]) t7.end() t7.rollback() t7.verify(sessions, ["a", "b", "c"]) @@ -1046,8 +1055,8 @@ class LongTests(BrokerTest): # Start sender and receiver threads cluster[0].declare_queue("test-queue") - sender = NumberedSender(cluster[0], 1000) # Max queue depth - receiver = NumberedReceiver(cluster[0], sender) + sender = NumberedSender(cluster[0], max_depth=1000) + receiver = NumberedReceiver(cluster[0], sender=sender) receiver.start() sender.start() # Wait for sender & receiver to get up and running @@ -1215,25 +1224,23 @@ class LongTests(BrokerTest): receiver = NumberedReceiver(cluster[0]) receiver.start() - senders = [NumberedSender(cluster[0]) for i in range(1,3)] - for s in senders: - s.start() + sender = NumberedSender(cluster[0]) + sender.start() # Wait for senders & receiver to get up and running - retry(lambda: receiver.received > 2*senders) + retry(lambda: receiver.received > 10) # Kill original brokers, start new ones for the duration. endtime = time.time() + self.duration(); i = 0 while time.time() < endtime: - for s in senders: s.sender.assert_running() + sender.sender.assert_running() receiver.receiver.assert_running() for b in cluster[i:]: b.ready() # Check if any broker crashed. cluster[i].kill() i += 1 b = cluster.start(expect=EXPECT_EXIT_FAIL) time.sleep(5) - for s in senders: - s.stop() + sender.stop() receiver.stop() for i in range(i, len(cluster)): cluster[i].kill() |