summaryrefslogtreecommitdiff
path: root/cpp/src/tests/cluster_tests.py
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/tests/cluster_tests.py')
-rwxr-xr-xcpp/src/tests/cluster_tests.py25
1 files changed, 16 insertions, 9 deletions
diff --git a/cpp/src/tests/cluster_tests.py b/cpp/src/tests/cluster_tests.py
index 2db2cdd433..cbc3df4a6b 100755
--- a/cpp/src/tests/cluster_tests.py
+++ b/cpp/src/tests/cluster_tests.py
@@ -241,6 +241,7 @@ acl deny all all
retry(lambda: find_in_file("brokerLinkUp", qp.outfile("out")))
broker1.ready()
broker2.ready()
+ qr.wait()
def test_queue_cleaner(self):
""" Regression test to ensure that cleanup of expired messages works correctly """
@@ -310,6 +311,7 @@ acl deny all all
client was attached.
"""
args=["--mgmt-pub-interval=1","--log-enable=trace+:management"]
+
# First broker will be killed.
cluster0 = self.cluster(1, args=args)
cluster1 = self.cluster(1, args=args)
@@ -345,9 +347,11 @@ acl deny all all
# Force a change of elder
cluster0.start()
+ for b in cluster0: b.ready()
cluster0[0].expect=EXPECT_EXIT_FAIL # About to die.
cluster0[0].kill()
time.sleep(2) # Allow a management interval to pass.
+ for b in cluster0[1:]: b.ready()
# Verify logs are consistent
cluster_test_logs.verify_logs()
@@ -883,16 +887,20 @@ class DtxTests(BrokerTest):
t5.send(["1", "2"])
# Accept messages in a transaction before/after join then commit
+ # Note: Message sent outside transaction, we're testing transactional acceptance.
t6 = DtxTestFixture(self, cluster[0], "t6")
t6.send(["a","b","c"])
t6.start()
self.assertEqual(t6.accept().body, "a");
+ t6.verify(sessions, ["b", "c"])
# Accept messages in a transaction before/after join then roll back
+ # Note: Message sent outside transaction, we're testing transactional acceptance.
t7 = DtxTestFixture(self, cluster[0], "t7")
t7.send(["a","b","c"])
t7.start()
self.assertEqual(t7.accept().body, "a");
+ t7.verify(sessions, ["b", "c"])
# Ended, suspended transactions across join.
t8 = DtxTestFixture(self, cluster[0], "t8")
@@ -948,6 +956,7 @@ class DtxTests(BrokerTest):
# Rollback t7
self.assertEqual(t7.accept().body, "b");
+ t7.verify(sessions, ["c"])
t7.end()
t7.rollback()
t7.verify(sessions, ["a", "b", "c"])
@@ -1046,8 +1055,8 @@ class LongTests(BrokerTest):
# Start sender and receiver threads
cluster[0].declare_queue("test-queue")
- sender = NumberedSender(cluster[0], 1000) # Max queue depth
- receiver = NumberedReceiver(cluster[0], sender)
+ sender = NumberedSender(cluster[0], max_depth=1000)
+ receiver = NumberedReceiver(cluster[0], sender=sender)
receiver.start()
sender.start()
# Wait for sender & receiver to get up and running
@@ -1215,25 +1224,23 @@ class LongTests(BrokerTest):
receiver = NumberedReceiver(cluster[0])
receiver.start()
- senders = [NumberedSender(cluster[0]) for i in range(1,3)]
- for s in senders:
- s.start()
+ sender = NumberedSender(cluster[0])
+ sender.start()
# Wait for senders & receiver to get up and running
- retry(lambda: receiver.received > 2*senders)
+ retry(lambda: receiver.received > 10)
# Kill original brokers, start new ones for the duration.
endtime = time.time() + self.duration();
i = 0
while time.time() < endtime:
- for s in senders: s.sender.assert_running()
+ sender.sender.assert_running()
receiver.receiver.assert_running()
for b in cluster[i:]: b.ready() # Check if any broker crashed.
cluster[i].kill()
i += 1
b = cluster.start(expect=EXPECT_EXIT_FAIL)
time.sleep(5)
- for s in senders:
- s.stop()
+ sender.stop()
receiver.stop()
for i in range(i, len(cluster)): cluster[i].kill()