#!/usr/bin/env python # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # import os, signal, sys, time, imp, re, subprocess, glob, cluster_test_logs from qpid import datatypes, messaging from brokertest import * from qpid.harness import Skipped from qpid.messaging import Message, Empty, Disposition, REJECTED from threading import Thread, Lock, Condition from logging import getLogger from itertools import chain from tempfile import NamedTemporaryFile log = getLogger("qpid.cluster_tests") # Note: brokers that shut themselves down due to critical error during # normal operation will still have an exit code of 0. Brokers that # shut down because of an error found during initialize will exit with # a non-0 code. Hence the apparently inconsistent use of EXPECT_EXIT_OK # and EXPECT_EXIT_FAIL in some of the tests below. # TODO aconway 2010-03-11: resolve this - ideally any exit due to an error # should give non-0 exit status. # Import scripts as modules qpid_cluster=import_script(checkenv("QPID_CLUSTER_EXEC")) def readfile(filename): """Returns te content of file named filename as a string""" f = file(filename) try: return f.read() finally: f.close() class ShortTests(BrokerTest): """Short cluster functionality tests.""" def test_message_replication(self): """Test basic cluster message replication.""" # Start a cluster, send some messages to member 0. cluster = self.cluster(2) s0 = cluster[0].connect().session() s0.sender("q; {create:always}").send(Message("x")) s0.sender("q; {create:always}").send(Message("y")) s0.connection.close() # Verify messages available on member 1. s1 = cluster[1].connect().session() m = s1.receiver("q", capacity=1).fetch(timeout=1) s1.acknowledge() self.assertEqual("x", m.content) s1.connection.close() # Start member 2 and verify messages available. s2 = cluster.start().connect().session() m = s2.receiver("q", capacity=1).fetch(timeout=1) s2.acknowledge() self.assertEqual("y", m.content) s2.connection.close() def test_store_direct_update_match(self): """Verify that brokers stores an identical message whether they receive it direct from clients or during an update, no header or other differences""" cluster = self.cluster(0, args=["--load-module", self.test_store_lib]) cluster.start(args=["--test-store-dump", "direct.dump"]) # Try messages with various headers cluster[0].send_message("q", Message(durable=True, content="foobar", subject="subject", reply_to="reply_to", properties={"n":10})) # Try messages of different sizes for size in range(0,10000,100): cluster[0].send_message("q", Message(content="x"*size, durable=True)) # Try sending via named exchange c = cluster[0].connect_old() s = c.session(str(qpid.datatypes.uuid4())) s.exchange_bind(exchange="amq.direct", binding_key="foo", queue="q") props = s.delivery_properties(routing_key="foo", delivery_mode=2) s.message_transfer( destination="amq.direct", message=qpid.datatypes.Message(props, "content")) # Now update a new member and compare their dumps. cluster.start(args=["--test-store-dump", "updatee.dump"]) assert readfile("direct.dump") == readfile("updatee.dump") os.remove("direct.dump") os.remove("updatee.dump") def test_sasl(self): """Test SASL authentication and encryption in a cluster""" sasl_config=os.path.join(self.rootdir, "sasl_config") acl=os.path.join(os.getcwd(), "policy.acl") aclf=file(acl,"w") aclf.write(""" acl deny zag@QPID create queue acl allow all all """) aclf.close() cluster = self.cluster(2, args=["--auth", "yes", "--sasl-config", sasl_config, "--load-module", os.getenv("ACL_LIB"), "--acl-file", acl]) # Valid user/password, ensure queue is created. c = cluster[0].connect(username="zig", password="zig") c.session().sender("ziggy;{create:always}") c.close() c = cluster[1].connect(username="zig", password="zig") c.session().receiver("ziggy;{assert:always}") c.close() for b in cluster: b.ready() # Make sure all brokers still running. # Valid user, bad password try: cluster[0].connect(username="zig", password="foo").close() self.fail("Expected exception") except messaging.exceptions.ConnectionError: pass for b in cluster: b.ready() # Make sure all brokers still running. # Bad user ID try: cluster[0].connect(username="foo", password="bar").close() self.fail("Expected exception") except messaging.exceptions.ConnectionError: pass for b in cluster: b.ready() # Make sure all brokers still running. # Action disallowed by ACL c = cluster[0].connect(username="zag", password="zag") try: s = c.session() s.sender("zaggy;{create:always}") s.close() self.fail("Expected exception") except messaging.exceptions.UnauthorizedAccess: pass # make sure the queue was not created at the other node. c = cluster[0].connect(username="zag", password="zag") try: s = c.session() s.sender("zaggy;{assert:always}") s.close() self.fail("Expected exception") except messaging.exceptions.NotFound: pass def test_user_id_update(self): """Ensure that user-id of an open session is updated to new cluster members""" sasl_config=os.path.join(self.rootdir, "sasl_config") cluster = self.cluster(1, args=["--auth", "yes", "--sasl-config", sasl_config,]) c = cluster[0].connect(username="zig", password="zig") s = c.session().sender("q;{create:always}") s.send(Message("x", user_id="zig")) # Message sent before start new broker cluster.start() s.send(Message("y", user_id="zig")) # Messsage sent after start of new broker # Verify brokers are healthy and messages are on the queue. self.assertEqual("x", cluster[0].get_message("q").content) self.assertEqual("y", cluster[1].get_message("q").content) def test_link_events(self): """Regression test for https://bugzilla.redhat.com/show_bug.cgi?id=611543""" args = ["--mgmt-pub-interval", 1] # Publish management information every second. broker1 = self.cluster(1, args)[0] broker2 = self.cluster(1, args)[0] qp = self.popen(["qpid-printevents", broker1.host_port()], EXPECT_RUNNING) qr = self.popen(["qpid-route", "route", "add", broker1.host_port(), broker2.host_port(), "amq.fanout", "key" ], EXPECT_EXIT_OK) # Look for link event in printevents output. retry(lambda: find_in_file("brokerLinkUp", qp.outfile("out"))) broker1.ready() broker2.ready() def test_queue_cleaner(self): """ Regression test to ensure that cleanup of expired messages works correctly """ cluster = self.cluster(2, args=["--queue-purge-interval", 3]) s0 = cluster[0].connect().session() sender = s0.sender("my-lvq; {create: always, node:{x-declare:{arguments:{'qpid.last_value_queue':1}}}}") #send 10 messages that will all expire and be cleaned up for i in range(1, 10): msg = Message("message-%s" % i) msg.properties["qpid.LVQ_key"] = "a" msg.ttl = 0.1 sender.send(msg) #wait for queue cleaner to run time.sleep(3) #test all is ok by sending and receiving a message msg = Message("non-expiring") msg.properties["qpid.LVQ_key"] = "b" sender.send(msg) s0.connection.close() s1 = cluster[1].connect().session() m = s1.receiver("my-lvq", capacity=1).fetch(timeout=1) s1.acknowledge() self.assertEqual("non-expiring", m.content) s1.connection.close() for b in cluster: b.ready() # Make sure all brokers still running. def test_amqfailover_visible(self): """Verify that the amq.failover exchange can be seen by QMF-based tools - regression test for BZ615300.""" broker1 = self.cluster(1)[0] broker2 = self.cluster(1)[0] qs = subprocess.Popen(["qpid-stat", "-e", broker1.host_port()], stdout=subprocess.PIPE) out = qs.communicate()[0] assert out.find("amq.failover") > 0 def evaluate_address(self, session, address): """Create a receiver just to evaluate an address for its side effects""" r = session.receiver(address) r.close() def test_expire_fanout(self): """Regression test for QPID-2874: Clustered broker crashes in assertion in cluster/ExpiryPolicy.cpp. Caused by a fan-out message being updated as separate messages""" cluster = self.cluster(1) session0 = cluster[0].connect().session() # Create 2 queues bound to fanout exchange. self.evaluate_address(session0, "q1;{create:always,node:{x-bindings:[{exchange:'amq.fanout',queue:q1}]}}") self.evaluate_address(session0, "q2;{create:always,node:{x-bindings:[{exchange:'amq.fanout',queue:q2}]}}") queues = ["q1", "q2"] # Send a fanout message with a long timeout s = session0.sender("amq.fanout") s.send(Message("foo", ttl=100), sync=False) # Start a new member, check the messages cluster.start() session1 = cluster[1].connect().session() for q in queues: self.assert_browse(session1, "q1", ["foo"]) def test_route_update(self): """Regression test for https://issues.apache.org/jira/browse/QPID-2982 Links and bridges associated with routes were not replicated on update. This meant extra management objects and caused an exit if a management client was attached. """ args=["--mgmt-pub-interval=1","--log-enable=trace+:management"] cluster0 = self.cluster(1, args=args) cluster1 = self.cluster(1, args=args) assert 0 == subprocess.call( ["qpid-route", "route", "add", cluster0[0].host_port(), cluster1[0].host_port(), "dummy-exchange", "dummy-key", "-d"]) cluster0.start() # Wait for qpid-tool:list on cluster0[0] to generate expected output. pattern = re.compile("org.apache.qpid.broker.*link") qpid_tool = subprocess.Popen(["qpid-tool", cluster0[0].host_port()], stdin=subprocess.PIPE, stdout=subprocess.PIPE) class Scanner(Thread): def __init__(self): self.found = False; Thread.__init__(self) def run(self): for l in qpid_tool.stdout: if pattern.search(l): self.found = True; return scanner = Scanner() scanner.start() start = time.time() try: # Wait up to 5 second timeout for scanner to find expected output while not scanner.found and time.time() < start + 5: qpid_tool.stdin.write("list\n") # Ask qpid-tool to list for b in cluster0: b.ready() # Raise if any brokers are down finally: qpid_tool.stdin.write("quit\n") qpid_tool.wait() scanner.join() assert scanner.found # Regression test for https://issues.apache.org/jira/browse/QPID-3235 # Inconsistent stats when changing elder. # Force a change of elder cluster0.start() cluster0[0].kill() time.sleep(2) # Allow a management interval to pass. # Verify logs are consistent cluster_test_logs.verify_logs() def test_redelivered(self): """Verify that redelivered flag is set correctly on replayed messages""" cluster = self.cluster(2, expect=EXPECT_EXIT_FAIL) url = "amqp:tcp:%s,tcp:%s" % (cluster[0].host_port(), cluster[1].host_port()) queue = "my-queue" cluster[0].declare_queue(queue) self.sender = self.popen( ["qpid-send", "--broker", url, "--address", queue, "--sequence=true", "--send-eos=1", "--messages=100000", "--connection-options={reconnect:true}" ]) self.receiver = self.popen( ["qpid-receive", "--broker", url, "--address", queue, "--ignore-duplicates", "--check-redelivered", "--connection-options={reconnect:true}", "--forever" ]) time.sleep(1)#give sender enough time to have some messages to replay cluster[0].kill() self.sender.wait() self.receiver.wait() cluster[1].kill() class BlockedSend(Thread): """Send a message, send is expected to block. Verify that it does block (for a given timeout), then allow waiting till it unblocks when it is expected to do so.""" def __init__(self, sender, msg): self.sender, self.msg = sender, msg self.blocked = True self.condition = Condition() self.timeout = 0.1 # Time to wait for expected results. Thread.__init__(self) def run(self): try: self.sender.send(self.msg, sync=True) self.condition.acquire() try: self.blocked = False self.condition.notify() finally: self.condition.release() except Exception,e: print "BlockedSend exception: %s"%e def start(self): Thread.start(self) time.sleep(self.timeout) assert self.blocked # Expected to block def assert_blocked(self): assert self.blocked def wait(self): # Now expecting to unblock self.condition.acquire() try: while self.blocked: self.condition.wait(self.timeout) if self.blocked: raise Exception("Timed out waiting for send to unblock") finally: self.condition.release() self.join() def queue_flowlimit_test(self, brokers): """Verify that the queue's flowlimit configuration and state are correctly replicated. The brokers argument allows this test to run on single broker, cluster of 2 pre-startd brokers or cluster where second broker starts after queue is in flow control. """ # configure a queue with a specific flow limit on first broker ssn0 = brokers.first().connect().session() s0 = ssn0.sender("flq; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.flow_stop_count':5, 'qpid.flow_resume_count':3}}}}") brokers.first().startQmf() q1 = [q for q in brokers.first().qmf_session.getObjects(_class="queue") if q.name == "flq"][0] oid = q1.getObjectId() self.assertEqual(q1.name, "flq") self.assertEqual(q1.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L}) assert not q1.flowStopped self.assertEqual(q1.flowStoppedCount, 0) # fill the queue on one broker until flow control is active for x in range(5): s0.send(Message(str(x))) sender = ShortTests.BlockedSend(s0, Message(str(6))) sender.start() # Tests that sender does block # Verify the broker queue goes into a flowStopped state deadline = time.time() + 1 while not q1.flowStopped and time.time() < deadline: q1.update() assert q1.flowStopped self.assertEqual(q1.flowStoppedCount, 1) sender.assert_blocked() # Still blocked # Now verify the both brokers in cluster have same configuration brokers.second().startQmf() qs = brokers.second().qmf_session.getObjects(_objectId=oid) self.assertEqual(len(qs), 1) q2 = qs[0] self.assertEqual(q2.name, "flq") self.assertEqual(q2.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L}) assert q2.flowStopped self.assertEqual(q2.flowStoppedCount, 1) # now drain the queue using a session to the other broker ssn1 = brokers.second().connect().session() r1 = ssn1.receiver("flq", capacity=6) for x in range(4): r1.fetch(timeout=0) ssn1.acknowledge() sender.wait() # Verify no longer blocked. # and re-verify state of queue on both brokers q1.update() assert not q1.flowStopped q2.update() assert not q2.flowStopped ssn0.connection.close() ssn1.connection.close() cluster_test_logs.verify_logs() def test_queue_flowlimit(self): """Test flow limits on a standalone broker""" broker = self.broker() class Brokers: def first(self): return broker def second(self): return broker self.queue_flowlimit_test(Brokers()) def test_queue_flowlimit_cluster(self): cluster = self.cluster(2) class Brokers: def first(self): return cluster[0] def second(self): return cluster[1] self.queue_flowlimit_test(Brokers()) def test_queue_flowlimit_cluster_join(self): cluster = self.cluster(1) class Brokers: def first(self): return cluster[0] def second(self): if len(cluster) == 1: cluster.start() return cluster[1] self.queue_flowlimit_test(Brokers()) def test_queue_flowlimit_replicate(self): """ Verify that a queue which is in flow control BUT has drained BELOW the flow control 'stop' threshold, is correctly replicated when a new broker is added to the cluster. """ class AsyncSender(Thread): """Send a fixed number of msgs from a sender in a separate thread so it may block without blocking the test. """ def __init__(self, broker, address, count=1, size=4): Thread.__init__(self) self.daemon = True self.broker = broker self.queue = address self.count = count self.size = size self.done = False def run(self): self.sender = subprocess.Popen(["qpid-send", "--capacity=1", "--content-size=%s" % self.size, "--messages=%s" % self.count, "--failover-updates", "--connection-options={reconnect:true}", "--address=%s" % self.queue, "--broker=%s" % self.broker.host_port()]) self.sender.wait() self.done = True cluster = self.cluster(2) # create a queue with rather draconian flow control settings ssn0 = cluster[0].connect().session() s0 = ssn0.sender("flq; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.flow_stop_count':100, 'qpid.flow_resume_count':20}}}}") # fire off the sending thread to broker[0], and wait until the queue # hits flow control on broker[1] sender = AsyncSender(cluster[0], "flq", count=110); sender.start(); cluster[1].startQmf() q_obj = [q for q in cluster[1].qmf_session.getObjects(_class="queue") if q.name == "flq"][0] deadline = time.time() + 10 while not q_obj.flowStopped and time.time() < deadline: q_obj.update() assert q_obj.flowStopped assert not sender.done assert q_obj.msgDepth < 110 # Now drain enough messages on broker[1] to drop below the flow stop # threshold, but not relieve flow control... receiver = subprocess.Popen(["qpid-receive", "--messages=15", "--timeout=1", "--print-content=no", "--failover-updates", "--connection-options={reconnect:true}", "--ack-frequency=1", "--address=flq", "--broker=%s" % cluster[1].host_port()]) receiver.wait() q_obj.update() assert q_obj.flowStopped assert not sender.done current_depth = q_obj.msgDepth # add a new broker to the cluster, and verify that the queue is in flow # control on that broker cluster.start() cluster[2].startQmf() q_obj = [q for q in cluster[2].qmf_session.getObjects(_class="queue") if q.name == "flq"][0] assert q_obj.flowStopped assert q_obj.msgDepth == current_depth # now drain the queue on broker[2], and verify that the sender becomes # unblocked receiver = subprocess.Popen(["qpid-receive", "--messages=95", "--timeout=1", "--print-content=no", "--failover-updates", "--connection-options={reconnect:true}", "--ack-frequency=1", "--address=flq", "--broker=%s" % cluster[2].host_port()]) receiver.wait() q_obj.update() assert not q_obj.flowStopped assert q_obj.msgDepth == 0 # verify that the sender has become unblocked sender.join(timeout=5) assert not sender.isAlive() assert sender.done def test_blocked_queue_delete(self): """Verify that producers which are blocked on a queue due to flow control are unblocked when that queue is deleted. """ cluster = self.cluster(2) cluster[0].startQmf() cluster[1].startQmf() # configure a queue with a specific flow limit on first broker ssn0 = cluster[0].connect().session() s0 = ssn0.sender("flq; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.flow_stop_count':5, 'qpid.flow_resume_count':3}}}}") q1 = [q for q in cluster[0].qmf_session.getObjects(_class="queue") if q.name == "flq"][0] oid = q1.getObjectId() self.assertEqual(q1.name, "flq") self.assertEqual(q1.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L}) assert not q1.flowStopped self.assertEqual(q1.flowStoppedCount, 0) # fill the queue on one broker until flow control is active for x in range(5): s0.send(Message(str(x))) sender = ShortTests.BlockedSend(s0, Message(str(6))) sender.start() # Tests that sender does block # Verify the broker queue goes into a flowStopped state deadline = time.time() + 1 while not q1.flowStopped and time.time() < deadline: q1.update() assert q1.flowStopped self.assertEqual(q1.flowStoppedCount, 1) sender.assert_blocked() # Still blocked # Now verify the both brokers in cluster have same configuration qs = cluster[1].qmf_session.getObjects(_objectId=oid) self.assertEqual(len(qs), 1) q2 = qs[0] self.assertEqual(q2.name, "flq") self.assertEqual(q2.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L}) assert q2.flowStopped self.assertEqual(q2.flowStoppedCount, 1) # now delete the blocked queue from other broker ssn1 = cluster[1].connect().session() self.evaluate_address(ssn1, "flq;{delete:always}") sender.wait() # Verify no longer blocked. ssn0.connection.close() ssn1.connection.close() cluster_test_logs.verify_logs() def test_alternate_exchange_update(self): """Verify that alternate-exchange on exchanges and queues is propagated to new members of a cluster. """ cluster = self.cluster(1) s0 = cluster[0].connect().session() # create alt queue bound to amq.fanout exchange, will be destination for alternate exchanges self.evaluate_address(s0, "alt;{create:always,node:{x-bindings:[{exchange:'amq.fanout',queue:alt}]}}") # create direct exchange ex with alternate-exchange amq.fanout and no queues bound self.evaluate_address(s0, "ex;{create:always,node:{type:topic, x-declare:{type:'direct', alternate-exchange:'amq.fanout'}}}") # create queue q with alternate-exchange amq.fanout self.evaluate_address(s0, "q;{create:always,node:{type:queue, x-declare:{alternate-exchange:'amq.fanout'}}}") def verify(broker): s = broker.connect().session() # Verify unmatched message goes to ex's alternate. s.sender("ex").send("foo") self.assertEqual("foo", s.receiver("alt").fetch(timeout=0).content) # Verify rejected message goes to q's alternate. s.sender("q").send("bar") msg = s.receiver("q").fetch(timeout=0) self.assertEqual("bar", msg.content) s.acknowledge(msg, Disposition(REJECTED)) # Reject the message self.assertEqual("bar", s.receiver("alt").fetch(timeout=0).content) verify(cluster[0]) cluster.start() verify(cluster[1]) def test_binding_order(self): """Regression test for binding order inconsistency in cluster""" cluster = self.cluster(1) c0 = cluster[0].connect() s0 = c0.session() # Declare multiple queues bound to same key on amq.topic def declare(q,max=0): if max: declare = 'x-declare:{arguments:{"qpid.max_count":%d, "qpid.flow_stop_count":0}}'%max else: declare = 'x-declare:{}' bind='x-bindings:[{queue:%s,key:key,exchange:"amq.topic"}]'%(q) s0.sender("%s;{create:always,node:{%s,%s}}" % (q,declare,bind)) declare('d',max=4) # Only one with a limit for q in ['c', 'b','a']: declare(q) # Add a cluster member, send enough messages to exceed the max count cluster.start() try: s = s0.sender('amq.topic/key') for m in xrange(1,6): s.send(Message(str(m))) self.fail("Expected capacity exceeded exception") except messaging.exceptions.TargetCapacityExceeded: pass c1 = cluster[1].connect() s1 = c1.session() s0 = c0.session() # Old session s0 is broken by exception. # Verify queue contents are consistent. for q in ['a','b','c','d']: self.assertEqual(self.browse(s0, q), self.browse(s1, q)) # Verify queue contents are "best effort" for q in ['a','b','c']: self.assert_browse(s1,q,[str(n) for n in xrange(1,6)]) self.assert_browse(s1,'d',[str(n) for n in xrange(1,5)]) def test_deleted_exchange(self): """QPID-3215: cached exchange reference can cause cluster inconsistencies if exchange is deleted/recreated Verify stand-alone case """ cluster = self.cluster() # Verify we do not route message via an exchange that has been destroyed. cluster.start() s0 = cluster[0].connect().session() self.evaluate_address(s0, "ex;{create:always,node:{type:topic}}") self.evaluate_address(s0, "q;{create:always,node:{x-bindings:[{exchange:'ex',queue:q,key:foo}]}}") send0 = s0.sender("ex/foo") send0.send("foo") self.assert_browse(s0, "q", ["foo"]) self.evaluate_address(s0, "ex;{delete:always}") try: send0.send("bar") # Should fail, exchange is deleted. self.fail("Expected not-found exception") except qpid.messaging.NotFound: pass self.assert_browse(cluster[0].connect().session(), "q", ["foo"]) def test_deleted_exchange_inconsistent(self): """QPID-3215: cached exchange reference can cause cluster inconsistencies if exchange is deleted/recreated Verify cluster inconsistency. """ cluster = self.cluster() cluster.start() s0 = cluster[0].connect().session() self.evaluate_address(s0, "ex;{create:always,node:{type:topic}}") self.evaluate_address(s0, "q;{create:always,node:{x-bindings:[{exchange:'ex',queue:q,key:foo}]}}") send0 = s0.sender("ex/foo") send0.send("foo") self.assert_browse(s0, "q", ["foo"]) cluster.start() s1 = cluster[1].connect().session() self.evaluate_address(s0, "ex;{delete:always}") try: send0.send("bar") self.fail("Expected not-found exception") except qpid.messaging.NotFound: pass self.assert_browse(s1, "q", ["foo"]) class LongTests(BrokerTest): """Tests that can run for a long time if -DDURATION= is set""" def duration(self): d = self.config.defines.get("DURATION") if d: return float(d)*60 else: return 3 # Default is to be quick def test_failover(self): """Test fail-over during continuous send-receive with errors""" # Original cluster will all be killed so expect exit with failure cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL) for b in cluster: ErrorGenerator(b) # Start sender and receiver threads cluster[0].declare_queue("test-queue") sender = NumberedSender(cluster[1], 1000) # Max queue depth receiver = NumberedReceiver(cluster[2], sender) receiver.start() sender.start() # Kill original brokers, start new ones for the duration. endtime = time.time() + self.duration() i = 0 while time.time() < endtime: cluster[i].kill() i += 1 b = cluster.start(expect=EXPECT_EXIT_FAIL) ErrorGenerator(b) time.sleep(5) sender.stop() receiver.stop() for i in range(i, len(cluster)): cluster[i].kill() def test_management(self, args=[]): """ Stress test: Run management clients and other clients concurrently while killing and restarting brokers. """ class ClientLoop(StoppableThread): """Run a client executable in a loop.""" def __init__(self, broker, cmd): StoppableThread.__init__(self) self.broker=broker self.cmd = cmd # Client command. self.lock = Lock() self.process = None # Client process. self.start() def run(self): try: while True: self.lock.acquire() try: if self.stopped: break self.process = self.broker.test.popen( self.cmd, expect=EXPECT_UNKNOWN) finally: self.lock.release() try: exit = self.process.wait() except OSError, e: # Process may already have been killed by self.stop() break except Exception, e: self.process.unexpected( "client of %s: %s"%(self.broker.name, e)) self.lock.acquire() try: if self.stopped: break if exit != 0: self.process.unexpected( "client of %s exit code %s"%(self.broker.name, exit)) finally: self.lock.release() except Exception, e: self.error = RethrownException("Error in ClientLoop.run") def stop(self): """Stop the running client and wait for it to exit""" self.lock.acquire() try: if self.stopped: return self.stopped = True if self.process: try: self.process.kill() # Kill the client. except OSError: pass # The client might not be running. finally: self.lock.release() StoppableThread.stop(self) # body of test_management() args += ["--mgmt-pub-interval", 1] args += ["--log-enable=trace+:management"] # Use store if present. if BrokerTest.store_lib: args +=["--load-module", BrokerTest.store_lib] cluster = self.cluster(3, args) clients = [] # Per-broker list of clients that only connect to one broker. mclients = [] # Management clients that connect to every broker in the cluster. def start_clients(broker): """Start ordinary clients for a broker.""" cmds=[ ["qpid-tool", "localhost:%s"%(broker.port())], ["qpid-perftest", "--count=5000", "--durable=yes", "--base-name", str(qpid.datatypes.uuid4()), "--port", broker.port()], ["qpid-txtest", "--queue-base-name", "tx-%s"%str(qpid.datatypes.uuid4()), "--port", broker.port()], ["qpid-queue-stats", "-a", "localhost:%s" %(broker.port())], ["testagent", "localhost", str(broker.port())] ] clients.append([ClientLoop(broker, cmd) for cmd in cmds]) def start_mclients(broker): """Start management clients that make multiple connections.""" cmd = ["qpid-stat", "-b", "localhost:%s" %(broker.port())] mclients.append(ClientLoop(broker, cmd)) endtime = time.time() + self.duration() # For long duration, first run is a quarter of the duration. runtime = max(5, self.duration() / 4.0) alive = 0 # First live cluster member for i in range(len(cluster)): start_clients(cluster[i]) start_mclients(cluster[alive]) while time.time() < endtime: time.sleep(runtime) runtime = 5 # Remaining runs 5 seconds, frequent broker kills for b in cluster[alive:]: b.ready() # Check if a broker crashed. # Kill the first broker, expect the clients to fail. b = cluster[alive] b.expect = EXPECT_EXIT_FAIL b.kill() # Stop the brokers clients and all the mclients. for c in clients[alive] + mclients: try: c.stop() except: pass # Ignore expected errors due to broker shutdown. clients[alive] = [] mclients = [] # Start another broker and clients alive += 1 cluster.start() start_clients(cluster[-1]) start_mclients(cluster[alive]) for c in chain(mclients, *clients): c.stop() # Verify that logs are consistent cluster_test_logs.verify_logs() def test_management_qmf2(self): self.test_management(args=["--mgmt-qmf2=yes"]) def test_connect_consistent(self): args=["--mgmt-pub-interval=1","--log-enable=trace+:management"] cluster = self.cluster(2, args=args) end = time.time() + self.duration() while (time.time() < end): # Get a management interval for i in xrange(1000): cluster[0].connect().close() cluster_test_logs.verify_logs() def test_flowlimit_failover(self): """Test fail-over during continuous send-receive with flow control active. """ # Original cluster will all be killed so expect exit with failure cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL) #for b in cluster: ErrorGenerator(b) # create a queue with rather draconian flow control settings ssn0 = cluster[0].connect().session() s0 = ssn0.sender("test-queue; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.flow_stop_count':2000, 'qpid.flow_resume_count':100}}}}") receiver = NumberedReceiver(cluster[2]) receiver.start() senders = [NumberedSender(cluster[i]) for i in range(1,3)] for s in senders: s.start() # Kill original brokers, start new ones for the duration. endtime = time.time() + self.duration(); i = 0 while time.time() < endtime: cluster[i].kill() i += 1 b = cluster.start(expect=EXPECT_EXIT_FAIL) #ErrorGenerator(b) time.sleep(5) #b = cluster[0] #b.startQmf() for s in senders: s.stop() receiver.stop() for i in range(i, len(cluster)): cluster[i].kill() class StoreTests(BrokerTest): """ Cluster tests that can only be run if there is a store available. """ def args(self): assert BrokerTest.store_lib return ["--load-module", BrokerTest.store_lib] def test_store_loaded(self): """Ensure we are indeed loading a working store""" broker = self.broker(self.args(), name="recoverme", expect=EXPECT_EXIT_FAIL) m = Message("x", durable=True) broker.send_message("q", m) broker.kill() broker = self.broker(self.args(), name="recoverme") self.assertEqual("x", broker.get_message("q").content) def test_kill_restart(self): """Verify we can kill/resetart a broker with store in a cluster""" cluster = self.cluster(1, self.args()) cluster.start("restartme", expect=EXPECT_EXIT_FAIL).kill() # Send a message, retrieve from the restarted broker cluster[0].send_message("q", "x") m = cluster.start("restartme").get_message("q") self.assertEqual("x", m.content) def stop_cluster(self,broker): """Clean shut-down of a cluster""" self.assertEqual(0, qpid_cluster.main( ["-kf", broker.host_port()])) def test_persistent_restart(self): """Verify persistent cluster shutdown/restart scenarios""" cluster = self.cluster(0, args=self.args() + ["--cluster-size=3"]) a = cluster.start("a", expect=EXPECT_EXIT_OK, wait=False) b = cluster.start("b", expect=EXPECT_EXIT_OK, wait=False) c = cluster.start("c", expect=EXPECT_EXIT_FAIL, wait=True) a.send_message("q", Message("1", durable=True)) # Kill & restart one member. c.kill() self.assertEqual(a.get_message("q").content, "1") a.send_message("q", Message("2", durable=True)) c = cluster.start("c", expect=EXPECT_EXIT_OK) self.assertEqual(c.get_message("q").content, "2") # Shut down the entire cluster cleanly and bring it back up a.send_message("q", Message("3", durable=True)) self.stop_cluster(a) a = cluster.start("a", wait=False) b = cluster.start("b", wait=False) c = cluster.start("c", wait=True) self.assertEqual(a.get_message("q").content, "3") def test_persistent_partial_failure(self): # Kill 2 members, shut down the last cleanly then restart # Ensure we use the clean database cluster = self.cluster(0, args=self.args() + ["--cluster-size=3"]) a = cluster.start("a", expect=EXPECT_EXIT_FAIL, wait=False) b = cluster.start("b", expect=EXPECT_EXIT_FAIL, wait=False) c = cluster.start("c", expect=EXPECT_EXIT_OK, wait=True) a.send_message("q", Message("4", durable=True)) a.kill() b.kill() self.assertEqual(c.get_message("q").content, "4") c.send_message("q", Message("clean", durable=True)) self.stop_cluster(c) a = cluster.start("a", wait=False) b = cluster.start("b", wait=False) c = cluster.start("c", wait=True) self.assertEqual(a.get_message("q").content, "clean") def test_wrong_cluster_id(self): # Start a cluster1 broker, then try to restart in cluster2 cluster1 = self.cluster(0, args=self.args()) a = cluster1.start("a", expect=EXPECT_EXIT_OK) a.terminate() cluster2 = self.cluster(1, args=self.args()) try: a = cluster2.start("a", expect=EXPECT_EXIT_FAIL) a.ready() self.fail("Expected exception") except: pass def test_wrong_shutdown_id(self): # Start 2 members and shut down. cluster = self.cluster(0, args=self.args()+["--cluster-size=2"]) a = cluster.start("a", expect=EXPECT_EXIT_OK, wait=False) b = cluster.start("b", expect=EXPECT_EXIT_OK, wait=False) self.stop_cluster(a) self.assertEqual(a.wait(), 0) self.assertEqual(b.wait(), 0) # Restart with a different member and shut down. a = cluster.start("a", expect=EXPECT_EXIT_OK, wait=False) c = cluster.start("c", expect=EXPECT_EXIT_OK, wait=False) self.stop_cluster(a) self.assertEqual(a.wait(), 0) self.assertEqual(c.wait(), 0) # Mix members from both shutdown events, they should fail # TODO aconway 2010-03-11: can't predict the exit status of these # as it depends on the order of delivery of initial-status messages. # See comment at top of this file. a = cluster.start("a", expect=EXPECT_UNKNOWN, wait=False) b = cluster.start("b", expect=EXPECT_UNKNOWN, wait=False) self.assertRaises(Exception, lambda: a.ready()) self.assertRaises(Exception, lambda: b.ready()) def test_solo_store_clean(self): # A single node cluster should always leave a clean store. cluster = self.cluster(0, self.args()) a = cluster.start("a", expect=EXPECT_EXIT_FAIL) a.send_message("q", Message("x", durable=True)) a.kill() a = cluster.start("a") self.assertEqual(a.get_message("q").content, "x") def test_last_store_clean(self): # Verify that only the last node in a cluster to shut down has # a clean store. Start with cluster of 3, reduce to 1 then # increase again to ensure that a node that was once alone but # finally did not finish as the last node does not get a clean # store. cluster = self.cluster(0, self.args()) a = cluster.start("a", expect=EXPECT_EXIT_FAIL) self.assertEqual(a.store_state(), "clean") b = cluster.start("b", expect=EXPECT_EXIT_FAIL) c = cluster.start("c", expect=EXPECT_EXIT_FAIL) self.assertEqual(b.store_state(), "dirty") self.assertEqual(c.store_state(), "dirty") retry(lambda: a.store_state() == "dirty") a.send_message("q", Message("x", durable=True)) a.kill() b.kill() # c is last man, will mark store clean retry(lambda: c.store_state() == "clean") a = cluster.start("a", expect=EXPECT_EXIT_FAIL) # c no longer last man retry(lambda: c.store_state() == "dirty") c.kill() # a is now last man retry(lambda: a.store_state() == "clean") a.kill() self.assertEqual(a.store_state(), "clean") self.assertEqual(b.store_state(), "dirty") self.assertEqual(c.store_state(), "dirty") def test_restart_clean(self): """Verify that we can re-start brokers one by one in a persistent cluster after a clean oshutdown""" cluster = self.cluster(0, self.args()) a = cluster.start("a", expect=EXPECT_EXIT_OK) b = cluster.start("b", expect=EXPECT_EXIT_OK) c = cluster.start("c", expect=EXPECT_EXIT_OK) a.send_message("q", Message("x", durable=True)) self.stop_cluster(a) a = cluster.start("a") b = cluster.start("b") c = cluster.start("c") self.assertEqual(c.get_message("q").content, "x") def test_join_sub_size(self): """Verify that after starting a cluster with cluster-size=N, we can join new members even if size < N-1""" cluster = self.cluster(0, self.args()+["--cluster-size=3"]) a = cluster.start("a", wait=False, expect=EXPECT_EXIT_FAIL) b = cluster.start("b", wait=False, expect=EXPECT_EXIT_FAIL) c = cluster.start("c") a.send_message("q", Message("x", durable=True)) a.send_message("q", Message("y", durable=True)) a.kill() b.kill() a = cluster.start("a") self.assertEqual(c.get_message("q").content, "x") b = cluster.start("b") self.assertEqual(c.get_message("q").content, "y")