diff options
Diffstat (limited to 'qpid/cpp/src/tests/cluster_tests.py')
-rwxr-xr-x | qpid/cpp/src/tests/cluster_tests.py | 572 |
1 files changed, 54 insertions, 518 deletions
diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py index 807e9508c3..3e13a3ce8a 100755 --- a/qpid/cpp/src/tests/cluster_tests.py +++ b/qpid/cpp/src/tests/cluster_tests.py @@ -18,12 +18,11 @@ # under the License. # -import os, signal, sys, time, imp, re, subprocess, glob, random, logging -import cluster_test_logs +import os, signal, sys, time, imp, re, subprocess, glob, cluster_test_logs from qpid import datatypes, messaging from brokertest import * from qpid.harness import Skipped -from qpid.messaging import Message, Empty, Disposition, REJECTED, util +from qpid.messaging import Message, Empty from threading import Thread, Lock, Condition from logging import getLogger from itertools import chain @@ -97,15 +96,9 @@ class ShortTests(BrokerTest): destination="amq.direct", message=qpid.datatypes.Message(props, "content")) - # Try message with TTL and differnet headers/properties - cluster[0].send_message("q", Message(durable=True, ttl=100000)) - cluster[0].send_message("q", Message(durable=True, properties={}, ttl=100000)) - cluster[0].send_message("q", Message(durable=True, properties={"x":10}, ttl=100000)) - # Now update a new member and compare their dumps. cluster.start(args=["--test-store-dump", "updatee.dump"]) assert readfile("direct.dump") == readfile("updatee.dump") - os.remove("direct.dump") os.remove("updatee.dump") @@ -253,6 +246,25 @@ acl allow all all session1 = cluster[1].connect().session() for q in queues: self.assert_browse(session1, "q1", ["foo"]) + def test_dr_no_message(self): + """Regression test for https://bugzilla.redhat.com/show_bug.cgi?id=655141 + Joining broker crashes with 'error deliveryRecord no update message' + """ + + cluster = self.cluster(1) + session0 = cluster[0].connect().session() + s = session0.sender("q1;{create:always}") + s.send(Message("a", ttl=0.05), sync=False) + s.send(Message("b", ttl=0.05), sync=False) + r1 = session0.receiver("q1") + self.assertEqual("a", r1.fetch(timeout=0).content) + r2 = session0.receiver("q1;{mode:browse}") + self.assertEqual("b", r2.fetch(timeout=0).content) + # Leave messages un-acknowledged, let the expire, then start new broker. + time.sleep(.1) + cluster.start() + self.assertRaises(Empty, cluster[1].connect().session().receiver("q1").fetch,0) + def test_route_update(self): """Regression test for https://issues.apache.org/jira/browse/QPID-2982 Links and bridges associated with routes were not replicated on update. @@ -260,7 +272,6 @@ acl allow all all client was attached. """ args=["--mgmt-pub-interval=1","--log-enable=trace+:management"] - # First broker will be killed. cluster0 = self.cluster(1, args=args) cluster1 = self.cluster(1, args=args) assert 0 == subprocess.call( @@ -290,47 +301,9 @@ acl allow all all qpid_tool.wait() scanner.join() assert scanner.found - # Regression test for https://issues.apache.org/jira/browse/QPID-3235 - # Inconsistent stats when changing elder. - - # Force a change of elder - cluster0.start() - cluster0[0].expect=EXPECT_EXIT_FAIL # About to die. - cluster0[0].kill() - time.sleep(2) # Allow a management interval to pass. # Verify logs are consistent cluster_test_logs.verify_logs() - def test_redelivered(self): - """Verify that redelivered flag is set correctly on replayed messages""" - cluster = self.cluster(2, expect=EXPECT_EXIT_FAIL) - url = "amqp:tcp:%s,tcp:%s" % (cluster[0].host_port(), cluster[1].host_port()) - queue = "my-queue" - cluster[0].declare_queue(queue) - self.sender = self.popen( - ["qpid-send", - "--broker", url, - "--address", queue, - "--sequence=true", - "--send-eos=1", - "--messages=100000", - "--connection-options={%s}"%(Cluster.CONNECTION_OPTIONS) - ]) - self.receiver = self.popen( - ["qpid-receive", - "--broker", url, - "--address", queue, - "--ignore-duplicates", - "--check-redelivered", - "--connection-options={%s}"%(Cluster.CONNECTION_OPTIONS), - "--forever" - ]) - time.sleep(1)#give sender enough time to have some messages to replay - cluster[0].kill() - self.sender.wait() - self.receiver.wait() - cluster[1].kill() - class BlockedSend(Thread): """Send a message, send is expected to block. Verify that it does block (for a given timeout), then allow @@ -343,7 +316,7 @@ acl allow all all Thread.__init__(self) def run(self): try: - self.sender.send(self.msg, sync=True) + self.sender.send(self.msg) self.condition.acquire() try: self.blocked = False @@ -375,12 +348,11 @@ acl allow all all ssn0 = brokers.first().connect().session() s0 = ssn0.sender("flq; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.flow_stop_count':5, 'qpid.flow_resume_count':3}}}}") brokers.first().startQmf() - q1 = [q for q in brokers.first().qmf_session.getObjects(_class="queue") if q.name == "flq"][0] - oid = q1.getObjectId() - self.assertEqual(q1.name, "flq") - self.assertEqual(q1.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L}) - assert not q1.flowStopped - self.assertEqual(q1.flowStoppedCount, 0) + q = [q for q in brokers.first().qmf_session.getObjects(_class="queue") if q.name == "flq"][0] + oid = q.getObjectId() + self.assertEqual(q.name, "flq") + self.assertEqual(q.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L}) + assert not q.flowStopped # fill the queue on one broker until flow control is active for x in range(5): s0.send(Message(str(x))) @@ -388,20 +360,18 @@ acl allow all all sender.start() # Tests that sender does block # Verify the broker queue goes into a flowStopped state deadline = time.time() + 1 - while not q1.flowStopped and time.time() < deadline: q1.update() - assert q1.flowStopped - self.assertEqual(q1.flowStoppedCount, 1) + while not q.flowStopped and time.time() < deadline: q.update() + assert q.flowStopped sender.assert_blocked() # Still blocked # Now verify the both brokers in cluster have same configuration brokers.second().startQmf() qs = brokers.second().qmf_session.getObjects(_objectId=oid) self.assertEqual(len(qs), 1) - q2 = qs[0] - self.assertEqual(q2.name, "flq") - self.assertEqual(q2.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L}) - assert q2.flowStopped - self.assertEqual(q2.flowStoppedCount, 1) + q = qs[0] + self.assertEqual(q.name, "flq") + self.assertEqual(q.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L}) + assert q.flowStopped # now drain the queue using a session to the other broker ssn1 = brokers.second().connect().session() @@ -411,12 +381,6 @@ acl allow all all ssn1.acknowledge() sender.wait() # Verify no longer blocked. - # and re-verify state of queue on both brokers - q1.update() - assert not q1.flowStopped - q2.update() - assert not q2.flowStopped - ssn0.connection.close() ssn1.connection.close() cluster_test_logs.verify_logs() @@ -430,6 +394,7 @@ acl allow all all self.queue_flowlimit_test(Brokers()) def test_queue_flowlimit_cluster(self): + return # TODO aconway 2011-02-18: disabled till fixed, QPID-2935 cluster = self.cluster(2) class Brokers: def first(self): return cluster[0] @@ -437,6 +402,7 @@ acl allow all all self.queue_flowlimit_test(Brokers()) def test_queue_flowlimit_cluster_join(self): + return # TODO aconway 2011-02-18: disabled till fixed, QPID-2935 cluster = self.cluster(1) class Brokers: def first(self): return cluster[0] @@ -445,274 +411,6 @@ acl allow all all return cluster[1] self.queue_flowlimit_test(Brokers()) - def test_queue_flowlimit_replicate(self): - """ Verify that a queue which is in flow control BUT has drained BELOW - the flow control 'stop' threshold, is correctly replicated when a new - broker is added to the cluster. - """ - - class AsyncSender(Thread): - """Send a fixed number of msgs from a sender in a separate thread - so it may block without blocking the test. - """ - def __init__(self, broker, address, count=1, size=4): - Thread.__init__(self) - self.daemon = True - self.broker = broker - self.queue = address - self.count = count - self.size = size - self.done = False - - def run(self): - self.sender = subprocess.Popen(["qpid-send", - "--capacity=1", - "--content-size=%s" % self.size, - "--messages=%s" % self.count, - "--failover-updates", - "--connection-options={%s}"%(Cluster.CONNECTION_OPTIONS), - "--address=%s" % self.queue, - "--broker=%s" % self.broker.host_port()]) - self.sender.wait() - self.done = True - - cluster = self.cluster(2) - # create a queue with rather draconian flow control settings - ssn0 = cluster[0].connect().session() - s0 = ssn0.sender("flq; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.flow_stop_count':100, 'qpid.flow_resume_count':20}}}}") - - # fire off the sending thread to broker[0], and wait until the queue - # hits flow control on broker[1] - sender = AsyncSender(cluster[0], "flq", count=110); - sender.start(); - - cluster[1].startQmf() - q_obj = [q for q in cluster[1].qmf_session.getObjects(_class="queue") if q.name == "flq"][0] - deadline = time.time() + 10 - while not q_obj.flowStopped and time.time() < deadline: - q_obj.update() - assert q_obj.flowStopped - assert not sender.done - assert q_obj.msgDepth < 110 - - # Now drain enough messages on broker[1] to drop below the flow stop - # threshold, but not relieve flow control... - receiver = subprocess.Popen(["qpid-receive", - "--messages=15", - "--timeout=1", - "--print-content=no", - "--failover-updates", - "--connection-options={%s}"%(Cluster.CONNECTION_OPTIONS), - "--ack-frequency=1", - "--address=flq", - "--broker=%s" % cluster[1].host_port()]) - receiver.wait() - q_obj.update() - assert q_obj.flowStopped - assert not sender.done - current_depth = q_obj.msgDepth - - # add a new broker to the cluster, and verify that the queue is in flow - # control on that broker - cluster.start() - cluster[2].startQmf() - q_obj = [q for q in cluster[2].qmf_session.getObjects(_class="queue") if q.name == "flq"][0] - assert q_obj.flowStopped - assert q_obj.msgDepth == current_depth - - # now drain the queue on broker[2], and verify that the sender becomes - # unblocked - receiver = subprocess.Popen(["qpid-receive", - "--messages=95", - "--timeout=1", - "--print-content=no", - "--failover-updates", - "--connection-options={%s}"%(Cluster.CONNECTION_OPTIONS), - "--ack-frequency=1", - "--address=flq", - "--broker=%s" % cluster[2].host_port()]) - receiver.wait() - q_obj.update() - assert not q_obj.flowStopped - self.assertEqual(q_obj.msgDepth, 0) - - # verify that the sender has become unblocked - sender.join(timeout=5) - assert not sender.isAlive() - assert sender.done - - def test_blocked_queue_delete(self): - """Verify that producers which are blocked on a queue due to flow - control are unblocked when that queue is deleted. - """ - - cluster = self.cluster(2) - cluster[0].startQmf() - cluster[1].startQmf() - - # configure a queue with a specific flow limit on first broker - ssn0 = cluster[0].connect().session() - s0 = ssn0.sender("flq; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.flow_stop_count':5, 'qpid.flow_resume_count':3}}}}") - q1 = [q for q in cluster[0].qmf_session.getObjects(_class="queue") if q.name == "flq"][0] - oid = q1.getObjectId() - self.assertEqual(q1.name, "flq") - self.assertEqual(q1.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L}) - assert not q1.flowStopped - self.assertEqual(q1.flowStoppedCount, 0) - - # fill the queue on one broker until flow control is active - for x in range(5): s0.send(Message(str(x))) - sender = ShortTests.BlockedSend(s0, Message(str(6))) - sender.start() # Tests that sender does block - # Verify the broker queue goes into a flowStopped state - deadline = time.time() + 1 - while not q1.flowStopped and time.time() < deadline: q1.update() - assert q1.flowStopped - self.assertEqual(q1.flowStoppedCount, 1) - sender.assert_blocked() # Still blocked - - # Now verify the both brokers in cluster have same configuration - qs = cluster[1].qmf_session.getObjects(_objectId=oid) - self.assertEqual(len(qs), 1) - q2 = qs[0] - self.assertEqual(q2.name, "flq") - self.assertEqual(q2.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L}) - assert q2.flowStopped - self.assertEqual(q2.flowStoppedCount, 1) - - # now delete the blocked queue from other broker - ssn1 = cluster[1].connect().session() - self.evaluate_address(ssn1, "flq;{delete:always}") - sender.wait() # Verify no longer blocked. - - ssn0.connection.close() - ssn1.connection.close() - cluster_test_logs.verify_logs() - - - def test_alternate_exchange_update(self): - """Verify that alternate-exchange on exchanges and queues is propagated to new members of a cluster. """ - cluster = self.cluster(1) - s0 = cluster[0].connect().session() - # create alt queue bound to amq.fanout exchange, will be destination for alternate exchanges - self.evaluate_address(s0, "alt;{create:always,node:{x-bindings:[{exchange:'amq.fanout',queue:alt}]}}") - # create direct exchange ex with alternate-exchange amq.fanout and no queues bound - self.evaluate_address(s0, "ex;{create:always,node:{type:topic, x-declare:{type:'direct', alternate-exchange:'amq.fanout'}}}") - # create queue q with alternate-exchange amq.fanout - self.evaluate_address(s0, "q;{create:always,node:{type:queue, x-declare:{alternate-exchange:'amq.fanout'}}}") - - def verify(broker): - s = broker.connect().session() - # Verify unmatched message goes to ex's alternate. - s.sender("ex").send("foo") - self.assertEqual("foo", s.receiver("alt").fetch(timeout=0).content) - # Verify rejected message goes to q's alternate. - s.sender("q").send("bar") - msg = s.receiver("q").fetch(timeout=0) - self.assertEqual("bar", msg.content) - s.acknowledge(msg, Disposition(REJECTED)) # Reject the message - self.assertEqual("bar", s.receiver("alt").fetch(timeout=0).content) - - verify(cluster[0]) - cluster.start() - verify(cluster[1]) - - def test_binding_order(self): - """Regression test for binding order inconsistency in cluster""" - cluster = self.cluster(1) - c0 = cluster[0].connect() - s0 = c0.session() - # Declare multiple queues bound to same key on amq.topic - def declare(q,max=0): - if max: declare = 'x-declare:{arguments:{"qpid.max_count":%d, "qpid.flow_stop_count":0}}'%max - else: declare = 'x-declare:{}' - bind='x-bindings:[{queue:%s,key:key,exchange:"amq.topic"}]'%(q) - s0.sender("%s;{create:always,node:{%s,%s}}" % (q,declare,bind)) - declare('d',max=4) # Only one with a limit - for q in ['c', 'b','a']: declare(q) - # Add a cluster member, send enough messages to exceed the max count - cluster.start() - try: - s = s0.sender('amq.topic/key') - for m in xrange(1,6): s.send(Message(str(m))) - self.fail("Expected capacity exceeded exception") - except messaging.exceptions.TargetCapacityExceeded: pass - c1 = cluster[1].connect() - s1 = c1.session() - s0 = c0.session() # Old session s0 is broken by exception. - # Verify queue contents are consistent. - for q in ['a','b','c','d']: - self.assertEqual(self.browse(s0, q), self.browse(s1, q)) - # Verify queue contents are "best effort" - for q in ['a','b','c']: self.assert_browse(s1,q,[str(n) for n in xrange(1,6)]) - self.assert_browse(s1,'d',[str(n) for n in xrange(1,5)]) - - def test_deleted_exchange(self): - """QPID-3215: cached exchange reference can cause cluster inconsistencies - if exchange is deleted/recreated - Verify stand-alone case - """ - cluster = self.cluster() - # Verify we do not route message via an exchange that has been destroyed. - cluster.start() - s0 = cluster[0].connect().session() - self.evaluate_address(s0, "ex;{create:always,node:{type:topic}}") - self.evaluate_address(s0, "q;{create:always,node:{x-bindings:[{exchange:'ex',queue:q,key:foo}]}}") - send0 = s0.sender("ex/foo") - send0.send("foo") - self.assert_browse(s0, "q", ["foo"]) - self.evaluate_address(s0, "ex;{delete:always}") - try: - send0.send("bar") # Should fail, exchange is deleted. - self.fail("Expected not-found exception") - except qpid.messaging.NotFound: pass - self.assert_browse(cluster[0].connect().session(), "q", ["foo"]) - - def test_deleted_exchange_inconsistent(self): - """QPID-3215: cached exchange reference can cause cluster inconsistencies - if exchange is deleted/recreated - - Verify cluster inconsistency. - """ - cluster = self.cluster() - cluster.start() - s0 = cluster[0].connect().session() - self.evaluate_address(s0, "ex;{create:always,node:{type:topic}}") - self.evaluate_address(s0, "q;{create:always,node:{x-bindings:[{exchange:'ex',queue:q,key:foo}]}}") - send0 = s0.sender("ex/foo") - send0.send("foo") - self.assert_browse(s0, "q", ["foo"]) - - cluster.start() - s1 = cluster[1].connect().session() - self.evaluate_address(s0, "ex;{delete:always}") - try: - send0.send("bar") - self.fail("Expected not-found exception") - except qpid.messaging.NotFound: pass - - self.assert_browse(s1, "q", ["foo"]) - - - def test_ttl_consistent(self): - """Ensure we don't get inconsistent errors with message that have TTL very close together""" - messages = [ Message(str(i), ttl=i/1000.0) for i in xrange(0,1000)] - messages.append(Message("x")) - cluster = self.cluster(2) - sender = cluster[0].connect().session().sender("q;{create:always}") - - def fetch(b): - receiver = b.connect().session().receiver("q;{create:always}") - while receiver.fetch().content != "x": pass - - for m in messages: sender.send(m, sync=False) - for m in messages: sender.send(m, sync=False) - fetch(cluster[0]) - fetch(cluster[1]) - for m in messages: sender.send(m, sync=False) - cluster.start() - fetch(cluster[2]) - class LongTests(BrokerTest): """Tests that can run for a long time if -DDURATION=<minutes> is set""" def duration(self): @@ -725,28 +423,22 @@ class LongTests(BrokerTest): # Original cluster will all be killed so expect exit with failure cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL) - for b in cluster: b.ready() # Wait for brokers to be ready for b in cluster: ErrorGenerator(b) # Start sender and receiver threads cluster[0].declare_queue("test-queue") - sender = NumberedSender(cluster[0], 1000) # Max queue depth - receiver = NumberedReceiver(cluster[0], sender) + sender = NumberedSender(cluster[1], 1000) # Max queue depth + receiver = NumberedReceiver(cluster[2], sender) receiver.start() sender.start() - # Wait for sender & receiver to get up and running - retry(lambda: receiver.received > 0) # Kill original brokers, start new ones for the duration. endtime = time.time() + self.duration() i = 0 while time.time() < endtime: - sender.sender.assert_running() - receiver.receiver.assert_running() cluster[i].kill() i += 1 b = cluster.start(expect=EXPECT_EXIT_FAIL) - for b in cluster[i:]: b.ready() ErrorGenerator(b) time.sleep(5) sender.stop() @@ -777,24 +469,24 @@ class LongTests(BrokerTest): if self.stopped: break self.process = self.broker.test.popen( self.cmd, expect=EXPECT_UNKNOWN) - finally: - self.lock.release() - try: - exit = self.process.wait() + finally: self.lock.release() + try: exit = self.process.wait() except OSError, e: - # Process may already have been killed by self.stop() - break + # Seems to be a race in wait(), it throws + # "no such process" during test shutdown. + # Doesn't indicate a test error, ignore. + return except Exception, e: self.process.unexpected( "client of %s: %s"%(self.broker.name, e)) self.lock.acquire() try: + # Quit and ignore errors if stopped or expecting failure. if self.stopped: break if exit != 0: self.process.unexpected( "client of %s exit code %s"%(self.broker.name, exit)) - finally: - self.lock.release() + finally: self.lock.release() except Exception, e: self.error = RethrownException("Error in ClientLoop.run") @@ -816,7 +508,7 @@ class LongTests(BrokerTest): args += ["--log-enable=trace+:management"] # Use store if present. if BrokerTest.store_lib: args +=["--load-module", BrokerTest.store_lib] - cluster = self.cluster(3, args, expect=EXPECT_EXIT_FAIL) # brokers will be killed + cluster = self.cluster(3, args) clients = [] # Per-broker list of clients that only connect to one broker. mclients = [] # Management clients that connect to every broker in the cluster. @@ -825,12 +517,10 @@ class LongTests(BrokerTest): """Start ordinary clients for a broker.""" cmds=[ ["qpid-tool", "localhost:%s"%(broker.port())], - ["qpid-perftest", "--count=5000", "--durable=yes", + ["qpid-perftest", "--count", 50000, "--base-name", str(qpid.datatypes.uuid4()), "--port", broker.port()], - ["qpid-txtest", "--queue-base-name", "tx-%s"%str(qpid.datatypes.uuid4()), - "--port", broker.port()], - ["qpid-queue-stats", "-a", "localhost:%s" %(broker.port())] - ] + ["qpid-queue-stats", "-a", "localhost:%s" %(broker.port())], + ["testagent", "localhost", str(broker.port())] ] clients.append([ClientLoop(broker, cmd) for cmd in cmds]) def start_mclients(broker): @@ -839,8 +529,7 @@ class LongTests(BrokerTest): mclients.append(ClientLoop(broker, cmd)) endtime = time.time() + self.duration() - # For long duration, first run is a quarter of the duration. - runtime = min(5.0, self.duration() / 3.0) + runtime = self.duration() / 4 # First run is longer, use quarter of duration. alive = 0 # First live cluster member for i in range(len(cluster)): start_clients(cluster[i]) start_mclients(cluster[alive]) @@ -851,7 +540,7 @@ class LongTests(BrokerTest): for b in cluster[alive:]: b.ready() # Check if a broker crashed. # Kill the first broker, expect the clients to fail. b = cluster[alive] - b.ready() + b.expect = EXPECT_EXIT_FAIL b.kill() # Stop the brokers clients and all the mclients. for c in clients[alive] + mclients: @@ -861,179 +550,26 @@ class LongTests(BrokerTest): mclients = [] # Start another broker and clients alive += 1 - cluster.start(expect=EXPECT_EXIT_FAIL) - cluster[-1].ready() # Wait till its ready + cluster.start() start_clients(cluster[-1]) start_mclients(cluster[alive]) for c in chain(mclients, *clients): c.stop() - for b in cluster[alive:]: - b.ready() # Verify still alive - b.kill() + # Verify that logs are consistent cluster_test_logs.verify_logs() def test_management_qmf2(self): self.test_management(args=["--mgmt-qmf2=yes"]) - def test_connect_consistent(self): + def test_connect_consistent(self): # FIXME aconway 2011-01-18: args=["--mgmt-pub-interval=1","--log-enable=trace+:management"] cluster = self.cluster(2, args=args) end = time.time() + self.duration() while (time.time() < end): # Get a management interval for i in xrange(1000): cluster[0].connect().close() - cluster_test_logs.verify_logs() - - def test_flowlimit_failover(self): - """Test fail-over during continuous send-receive with flow control - active. - """ - - # Original cluster will all be killed so expect exit with failure - cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL) - for b in cluster: b.ready() # Wait for brokers to be ready - - # create a queue with rather draconian flow control settings - ssn0 = cluster[0].connect().session() - s0 = ssn0.sender("test-queue; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.flow_stop_count':2000, 'qpid.flow_resume_count':100}}}}") - - receiver = NumberedReceiver(cluster[0]) - receiver.start() - senders = [NumberedSender(cluster[0]) for i in range(1,3)] - for s in senders: - s.start() - # Wait for senders & receiver to get up and running - retry(lambda: receiver.received > 2*senders) - - # Kill original brokers, start new ones for the duration. - endtime = time.time() + self.duration(); - i = 0 - while time.time() < endtime: - for s in senders: s.sender.assert_running() - receiver.receiver.assert_running() - for b in cluster[i:]: b.ready() # Check if any broker crashed. - cluster[i].kill() - i += 1 - b = cluster.start(expect=EXPECT_EXIT_FAIL) - time.sleep(5) - for s in senders: - s.stop() - receiver.stop() - for i in range(i, len(cluster)): cluster[i].kill() - - def test_ttl_failover(self): - """Test that messages with TTL don't cause problems in a cluster with failover""" - - class Client(StoppableThread): - - def __init__(self, broker): - StoppableThread.__init__(self) - self.connection = broker.connect(reconnect=True) - self.auto_fetch_reconnect_urls(self.connection) - self.session = self.connection.session() - - def auto_fetch_reconnect_urls(self, conn): - """Replacment for qpid.messaging.util version which is noisy""" - ssn = conn.session("auto-fetch-reconnect-urls") - rcv = ssn.receiver("amq.failover") - rcv.capacity = 10 - - def main(): - while True: - try: - msg = rcv.fetch() - qpid.messaging.util.set_reconnect_urls(conn, msg) - ssn.acknowledge(msg, sync=False) - except messaging.exceptions.LinkClosed: return - except messaging.exceptions.ConnectionError: return - - thread = Thread(name="auto-fetch-reconnect-urls", target=main) - thread.setDaemon(True) - thread.start() - - def stop(self): - StoppableThread.stop(self) - self.connection.detach() - - class Sender(Client): - def __init__(self, broker, address): - Client.__init__(self, broker) - self.sent = 0 # Number of messages _reliably_ sent. - self.sender = self.session.sender(address, capacity=1000) - - def send_counted(self, ttl): - self.sender.send(Message(str(self.sent), ttl=ttl)) - self.sent += 1 - - def run(self): - while not self.stopped: - choice = random.randint(0,4) - if choice == 0: self.send_counted(None) # No ttl - elif choice == 1: self.send_counted(100000) # Large ttl - else: # Small ttl, might expire - self.sender.send(Message("", ttl=random.random()/10)) - self.sender.send(Message("z"), sync=True) # Chaser. - - class Receiver(Client): - - def __init__(self, broker, address): - Client.__init__(self, broker) - self.received = 0 # Number of non-empty (reliable) messages received. - self.receiver = self.session.receiver(address, capacity=1000) - def run(self): - try: - while True: - m = self.receiver.fetch(1) - if m.content == "z": break - if m.content: # Ignore unreliable messages - # Ignore duplicates - if int(m.content) == self.received: self.received += 1 - except Exception,e: self.error = e - - # def test_ttl_failover - - # Original cluster will all be killed so expect exit with failure - # Set small purge interval. - cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL, args=["--queue-purge-interval=1"]) - for b in cluster: b.ready() # Wait for brokers to be ready - - # Python client failover produces noisy WARN logs, disable temporarily - logger = logging.getLogger() - log_level = logger.getEffectiveLevel() - logger.setLevel(logging.ERROR) - try: - # Start sender and receiver threads - receiver = Receiver(cluster[0], "q;{create:always}") - receiver.start() - sender = Sender(cluster[0], "q;{create:always}") - sender.start() - # Wait for sender & receiver to get up and running - retry(lambda: receiver.received > 0) - - # Kill brokers in a cycle. - endtime = time.time() + self.duration() - runtime = min(5.0, self.duration() / 4.0) - i = 0 - while time.time() < endtime: - for b in cluster[i:]: b.ready() # Check if any broker crashed. - cluster[i].kill() - i += 1 - b = cluster.start(expect=EXPECT_EXIT_FAIL) - b.ready() - time.sleep(runtime) - sender.stop() - receiver.stop() - for b in cluster[i:]: - b.ready() # Check it didn't crash - b.kill() - self.assertEqual(sender.sent, receiver.received) cluster_test_logs.verify_logs() - finally: - # Detach to avoid slow reconnect attempts during shut-down if test fails. - sender.connection.detach() - receiver.connection.detach() - logger.setLevel(log_level) class StoreTests(BrokerTest): """ |