diff options
Diffstat (limited to 'qpid/cpp/src/tests/cluster_tests.py')
-rwxr-xr-x | qpid/cpp/src/tests/cluster_tests.py | 143 |
1 files changed, 139 insertions, 4 deletions
diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py index 66cfd27f7b..f5ce2a234f 100755 --- a/qpid/cpp/src/tests/cluster_tests.py +++ b/qpid/cpp/src/tests/cluster_tests.py @@ -18,11 +18,12 @@ # under the License. # -import os, signal, sys, time, imp, re, subprocess, glob, cluster_test_logs +import os, signal, sys, time, imp, re, subprocess, glob, random, logging +import cluster_test_logs from qpid import datatypes, messaging from brokertest import * from qpid.harness import Skipped -from qpid.messaging import Message, Empty, Disposition, REJECTED +from qpid.messaging import Message, Empty, Disposition, REJECTED, util from threading import Thread, Lock, Condition from logging import getLogger from itertools import chain @@ -96,9 +97,15 @@ class ShortTests(BrokerTest): destination="amq.direct", message=qpid.datatypes.Message(props, "content")) + # Try message with TTL and differnet headers/properties + cluster[0].send_message("q", Message(durable=True, ttl=100000)) + cluster[0].send_message("q", Message(durable=True, properties={}, ttl=100000)) + cluster[0].send_message("q", Message(durable=True, properties={"x":10}, ttl=100000)) + # Now update a new member and compare their dumps. cluster.start(args=["--test-store-dump", "updatee.dump"]) assert readfile("direct.dump") == readfile("updatee.dump") + os.remove("direct.dump") os.remove("updatee.dump") @@ -687,6 +694,25 @@ acl allow all all self.assert_browse(s1, "q", ["foo"]) + def test_ttl_consistent(self): + """Ensure we don't get inconsistent errors with message that have TTL very close together""" + messages = [ Message(str(i), ttl=i/1000.0) for i in xrange(0,1000)] + messages.append(Message("x")) + cluster = self.cluster(2) + sender = cluster[0].connect().session().sender("q;{create:always}") + + def fetch(b): + receiver = b.connect().session().receiver("q;{create:always}") + while receiver.fetch().content != "x": pass + + for m in messages: sender.send(m, sync=False) + for m in messages: sender.send(m, sync=False) + fetch(cluster[0]) + fetch(cluster[1]) + for m in messages: sender.send(m, sync=False) + cluster.start() + fetch(cluster[2]) + class LongTests(BrokerTest): """Tests that can run for a long time if -DDURATION=<minutes> is set""" def duration(self): @@ -811,7 +837,7 @@ class LongTests(BrokerTest): endtime = time.time() + self.duration() # For long duration, first run is a quarter of the duration. - runtime = max(5, self.duration() / 4.0) + runtime = min(5.0, self.duration() / 3.0) alive = 0 # First live cluster member for i in range(len(cluster)): start_clients(cluster[i]) start_mclients(cluster[alive]) @@ -853,7 +879,7 @@ class LongTests(BrokerTest): end = time.time() + self.duration() while (time.time() < end): # Get a management interval for i in xrange(1000): cluster[0].connect().close() - cluster_test_logs.verify_logs() + cluster_test_logs.verify_logs() def test_flowlimit_failover(self): """Test fail-over during continuous send-receive with flow control @@ -880,6 +906,7 @@ class LongTests(BrokerTest): while time.time() < endtime: for s in senders: s.sender.assert_running() receiver.receiver.assert_running() + for b in cluster[i:]: b.ready() # Check if any broker crashed. cluster[i].kill() i += 1 b = cluster.start(expect=EXPECT_EXIT_FAIL) @@ -889,6 +916,114 @@ class LongTests(BrokerTest): receiver.stop() for i in range(i, len(cluster)): cluster[i].kill() + def test_ttl_failover(self): + """Test that messages with TTL don't cause problems in a cluster with failover""" + + class Client(StoppableThread): + + def __init__(self, broker): + StoppableThread.__init__(self) + self.connection = broker.connect(reconnect=True) + self.auto_fetch_reconnect_urls(self.connection) + self.session = self.connection.session() + + def auto_fetch_reconnect_urls(self, conn): + """Replacment for qpid.messaging.util version which is noisy""" + ssn = conn.session("auto-fetch-reconnect-urls") + rcv = ssn.receiver("amq.failover") + rcv.capacity = 10 + + def main(): + while True: + try: + msg = rcv.fetch() + qpid.messaging.util.set_reconnect_urls(conn, msg) + ssn.acknowledge(msg, sync=False) + except messaging.exceptions.LinkClosed: return + except messaging.exceptions.ConnectionError: return + + thread = Thread(name="auto-fetch-reconnect-urls", target=main) + thread.setDaemon(True) + thread.start() + + def stop(self): + StoppableThread.stop(self) + self.connection.detach() + + class Sender(Client): + def __init__(self, broker, address): + Client.__init__(self, broker) + self.sent = 0 # Number of messages _reliably_ sent. + self.sender = self.session.sender(address, capacity=1000) + + def send_counted(self, ttl): + self.sender.send(Message(str(self.sent), ttl=ttl)) + self.sent += 1 + + def run(self): + while not self.stopped: + choice = random.randint(0,4) + if choice == 0: self.send_counted(None) # No ttl + elif choice == 1: self.send_counted(100000) # Large ttl + else: # Small ttl, might expire + self.sender.send(Message("", ttl=random.random()/10)) + self.sender.send(Message("z"), sync=True) # Chaser. + + class Receiver(Client): + + def __init__(self, broker, address): + Client.__init__(self, broker) + self.received = 0 # Number of non-empty (reliable) messages received. + self.receiver = self.session.receiver(address, capacity=1000) + def run(self): + try: + while True: + m = self.receiver.fetch(1) + if m.content == "z": break + if m.content: # Ignore unreliable messages + # Ignore duplicates + if int(m.content) == self.received: self.received += 1 + except Exception,e: self.error = e + + # def test_ttl_failover + + # Original cluster will all be killed so expect exit with failure + # Set small purge interval. + cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL, args=["--queue-purge-interval=1"]) + # Python client failover produces noisy WARN logs, disable temporarily + logger = logging.getLogger() + log_level = logger.getEffectiveLevel() + logger.setLevel(logging.ERROR) + try: + # Start sender and receiver threads + receiver = Receiver(cluster[0], "q;{create:always}") + receiver.start() + sender = Sender(cluster[0], "q;{create:always}") + sender.start() + + # Kill brokers in a cycle. + endtime = time.time() + self.duration() + runtime = min(5.0, self.duration() / 4.0) + i = 0 + while time.time() < endtime: + for b in cluster[i:]: b.ready() # Check if any broker crashed. + cluster[i].kill() + i += 1 + b = cluster.start(expect=EXPECT_EXIT_FAIL) + b.ready() + time.sleep(runtime) + sender.stop() + receiver.stop() + for b in cluster[i:]: + b.ready() # Check it didn't crash + b.kill() + self.assertEqual(sender.sent, receiver.received) + cluster_test_logs.verify_logs() + finally: + # Detach to avoid slow reconnect attempts during shut-down if test fails. + sender.connection.detach() + receiver.connection.detach() + logger.setLevel(log_level) class StoreTests(BrokerTest): """ |