diff options
Diffstat (limited to 'qpid/cpp/src/tests/cluster_tests.py')
-rwxr-xr-x | qpid/cpp/src/tests/cluster_tests.py | 1057 |
1 files changed, 1057 insertions, 0 deletions
diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py new file mode 100755 index 0000000000..593791297a --- /dev/null +++ b/qpid/cpp/src/tests/cluster_tests.py @@ -0,0 +1,1057 @@ +#!/usr/bin/env python + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import os, signal, sys, time, imp, re, subprocess, glob, cluster_test_logs +from qpid import datatypes, messaging +from brokertest import * +from qpid.harness import Skipped +from qpid.messaging import Message, Empty, Disposition, REJECTED +from threading import Thread, Lock, Condition +from logging import getLogger +from itertools import chain +from tempfile import NamedTemporaryFile + +log = getLogger("qpid.cluster_tests") + +# Note: brokers that shut themselves down due to critical error during +# normal operation will still have an exit code of 0. Brokers that +# shut down because of an error found during initialize will exit with +# a non-0 code. Hence the apparently inconsistent use of EXPECT_EXIT_OK +# and EXPECT_EXIT_FAIL in some of the tests below. + +# TODO aconway 2010-03-11: resolve this - ideally any exit due to an error +# should give non-0 exit status. + +# Import scripts as modules +qpid_cluster=import_script(checkenv("QPID_CLUSTER_EXEC")) + +def readfile(filename): + """Returns te content of file named filename as a string""" + f = file(filename) + try: return f.read() + finally: f.close() + +class ShortTests(BrokerTest): + """Short cluster functionality tests.""" + + def test_message_replication(self): + """Test basic cluster message replication.""" + # Start a cluster, send some messages to member 0. + cluster = self.cluster(2) + s0 = cluster[0].connect().session() + s0.sender("q; {create:always}").send(Message("x")) + s0.sender("q; {create:always}").send(Message("y")) + s0.connection.close() + + # Verify messages available on member 1. + s1 = cluster[1].connect().session() + m = s1.receiver("q", capacity=1).fetch(timeout=1) + s1.acknowledge() + self.assertEqual("x", m.content) + s1.connection.close() + + # Start member 2 and verify messages available. + s2 = cluster.start().connect().session() + m = s2.receiver("q", capacity=1).fetch(timeout=1) + s2.acknowledge() + self.assertEqual("y", m.content) + s2.connection.close() + + def test_store_direct_update_match(self): + """Verify that brokers stores an identical message whether they receive it + direct from clients or during an update, no header or other differences""" + cluster = self.cluster(0, args=["--load-module", self.test_store_lib]) + cluster.start(args=["--test-store-dump", "direct.dump"]) + # Try messages with various headers + cluster[0].send_message("q", Message(durable=True, content="foobar", + subject="subject", + reply_to="reply_to", + properties={"n":10})) + # Try messages of different sizes + for size in range(0,10000,100): + cluster[0].send_message("q", Message(content="x"*size, durable=True)) + # Try sending via named exchange + c = cluster[0].connect_old() + s = c.session(str(qpid.datatypes.uuid4())) + s.exchange_bind(exchange="amq.direct", binding_key="foo", queue="q") + props = s.delivery_properties(routing_key="foo", delivery_mode=2) + s.message_transfer( + destination="amq.direct", + message=qpid.datatypes.Message(props, "content")) + + # Now update a new member and compare their dumps. + cluster.start(args=["--test-store-dump", "updatee.dump"]) + assert readfile("direct.dump") == readfile("updatee.dump") + os.remove("direct.dump") + os.remove("updatee.dump") + + def test_sasl(self): + """Test SASL authentication and encryption in a cluster""" + sasl_config=os.path.join(self.rootdir, "sasl_config") + acl=os.path.join(os.getcwd(), "policy.acl") + aclf=file(acl,"w") + aclf.write(""" +acl deny zag@QPID create queue +acl allow all all +""") + aclf.close() + cluster = self.cluster(2, args=["--auth", "yes", + "--sasl-config", sasl_config, + "--load-module", os.getenv("ACL_LIB"), + "--acl-file", acl]) + + # Valid user/password, ensure queue is created. + c = cluster[0].connect(username="zig", password="zig") + c.session().sender("ziggy;{create:always}") + c.close() + c = cluster[1].connect(username="zig", password="zig") + c.session().receiver("ziggy;{assert:always}") + c.close() + for b in cluster: b.ready() # Make sure all brokers still running. + + # Valid user, bad password + try: + cluster[0].connect(username="zig", password="foo").close() + self.fail("Expected exception") + except messaging.exceptions.ConnectionError: pass + for b in cluster: b.ready() # Make sure all brokers still running. + + # Bad user ID + try: + cluster[0].connect(username="foo", password="bar").close() + self.fail("Expected exception") + except messaging.exceptions.ConnectionError: pass + for b in cluster: b.ready() # Make sure all brokers still running. + + # Action disallowed by ACL + c = cluster[0].connect(username="zag", password="zag") + try: + s = c.session() + s.sender("zaggy;{create:always}") + s.close() + self.fail("Expected exception") + except messaging.exceptions.UnauthorizedAccess: pass + # make sure the queue was not created at the other node. + c = cluster[0].connect(username="zag", password="zag") + try: + s = c.session() + s.sender("zaggy;{assert:always}") + s.close() + self.fail("Expected exception") + except messaging.exceptions.NotFound: pass + + def test_user_id_update(self): + """Ensure that user-id of an open session is updated to new cluster members""" + sasl_config=os.path.join(self.rootdir, "sasl_config") + cluster = self.cluster(1, args=["--auth", "yes", "--sasl-config", sasl_config,]) + c = cluster[0].connect(username="zig", password="zig") + s = c.session().sender("q;{create:always}") + s.send(Message("x", user_id="zig")) # Message sent before start new broker + cluster.start() + s.send(Message("y", user_id="zig")) # Messsage sent after start of new broker + # Verify brokers are healthy and messages are on the queue. + self.assertEqual("x", cluster[0].get_message("q").content) + self.assertEqual("y", cluster[1].get_message("q").content) + + def test_link_events(self): + """Regression test for https://bugzilla.redhat.com/show_bug.cgi?id=611543""" + args = ["--mgmt-pub-interval", 1] # Publish management information every second. + broker1 = self.cluster(1, args)[0] + broker2 = self.cluster(1, args)[0] + qp = self.popen(["qpid-printevents", broker1.host_port()], EXPECT_RUNNING) + qr = self.popen(["qpid-route", "route", "add", + broker1.host_port(), broker2.host_port(), + "amq.fanout", "key" + ], EXPECT_EXIT_OK) + # Look for link event in printevents output. + retry(lambda: find_in_file("brokerLinkUp", qp.outfile("out"))) + broker1.ready() + broker2.ready() + + def test_queue_cleaner(self): + """ Regression test to ensure that cleanup of expired messages works correctly """ + cluster = self.cluster(2, args=["--queue-purge-interval", 3]) + + s0 = cluster[0].connect().session() + sender = s0.sender("my-lvq; {create: always, node:{x-declare:{arguments:{'qpid.last_value_queue':1}}}}") + #send 10 messages that will all expire and be cleaned up + for i in range(1, 10): + msg = Message("message-%s" % i) + msg.properties["qpid.LVQ_key"] = "a" + msg.ttl = 0.1 + sender.send(msg) + #wait for queue cleaner to run + time.sleep(3) + + #test all is ok by sending and receiving a message + msg = Message("non-expiring") + msg.properties["qpid.LVQ_key"] = "b" + sender.send(msg) + s0.connection.close() + s1 = cluster[1].connect().session() + m = s1.receiver("my-lvq", capacity=1).fetch(timeout=1) + s1.acknowledge() + self.assertEqual("non-expiring", m.content) + s1.connection.close() + + for b in cluster: b.ready() # Make sure all brokers still running. + + + def test_amqfailover_visible(self): + """Verify that the amq.failover exchange can be seen by + QMF-based tools - regression test for BZ615300.""" + broker1 = self.cluster(1)[0] + broker2 = self.cluster(1)[0] + qs = subprocess.Popen(["qpid-stat", "-e", broker1.host_port()], stdout=subprocess.PIPE) + out = qs.communicate()[0] + assert out.find("amq.failover") > 0 + + def evaluate_address(self, session, address): + """Create a receiver just to evaluate an address for its side effects""" + r = session.receiver(address) + r.close() + + def test_expire_fanout(self): + """Regression test for QPID-2874: Clustered broker crashes in assertion in + cluster/ExpiryPolicy.cpp. + Caused by a fan-out message being updated as separate messages""" + cluster = self.cluster(1) + session0 = cluster[0].connect().session() + # Create 2 queues bound to fanout exchange. + self.evaluate_address(session0, "q1;{create:always,node:{x-bindings:[{exchange:'amq.fanout',queue:q1}]}}") + self.evaluate_address(session0, "q2;{create:always,node:{x-bindings:[{exchange:'amq.fanout',queue:q2}]}}") + queues = ["q1", "q2"] + # Send a fanout message with a long timeout + s = session0.sender("amq.fanout") + s.send(Message("foo", ttl=100), sync=False) + # Start a new member, check the messages + cluster.start() + session1 = cluster[1].connect().session() + for q in queues: self.assert_browse(session1, "q1", ["foo"]) + + def test_route_update(self): + """Regression test for https://issues.apache.org/jira/browse/QPID-2982 + Links and bridges associated with routes were not replicated on update. + This meant extra management objects and caused an exit if a management + client was attached. + """ + args=["--mgmt-pub-interval=1","--log-enable=trace+:management"] + cluster0 = self.cluster(1, args=args) + cluster1 = self.cluster(1, args=args) + assert 0 == subprocess.call( + ["qpid-route", "route", "add", cluster0[0].host_port(), + cluster1[0].host_port(), "dummy-exchange", "dummy-key", "-d"]) + cluster0.start() + + # Wait for qpid-tool:list on cluster0[0] to generate expected output. + pattern = re.compile("org.apache.qpid.broker.*link") + qpid_tool = subprocess.Popen(["qpid-tool", cluster0[0].host_port()], + stdin=subprocess.PIPE, stdout=subprocess.PIPE) + class Scanner(Thread): + def __init__(self): self.found = False; Thread.__init__(self) + def run(self): + for l in qpid_tool.stdout: + if pattern.search(l): self.found = True; return + scanner = Scanner() + scanner.start() + start = time.time() + try: + # Wait up to 5 second timeout for scanner to find expected output + while not scanner.found and time.time() < start + 5: + qpid_tool.stdin.write("list\n") # Ask qpid-tool to list + for b in cluster0: b.ready() # Raise if any brokers are down + finally: + qpid_tool.stdin.write("quit\n") + qpid_tool.wait() + scanner.join() + assert scanner.found + # Regression test for https://issues.apache.org/jira/browse/QPID-3235 + # Inconsistent stats when changing elder. + + # Force a change of elder + cluster0.start() + cluster0[0].kill() + time.sleep(2) # Allow a management interval to pass. + # Verify logs are consistent + cluster_test_logs.verify_logs() + + def test_redelivered(self): + """Verify that redelivered flag is set correctly on replayed messages""" + cluster = self.cluster(2, expect=EXPECT_EXIT_FAIL) + url = "amqp:tcp:%s,tcp:%s" % (cluster[0].host_port(), cluster[1].host_port()) + queue = "my-queue" + cluster[0].declare_queue(queue) + self.sender = self.popen( + ["qpid-send", + "--broker", url, + "--address", queue, + "--sequence=true", + "--send-eos=1", + "--messages=100000", + "--connection-options={reconnect:true}" + ]) + self.receiver = self.popen( + ["qpid-receive", + "--broker", url, + "--address", queue, + "--ignore-duplicates", + "--check-redelivered", + "--connection-options={reconnect:true}", + "--forever" + ]) + time.sleep(1)#give sender enough time to have some messages to replay + cluster[0].kill() + self.sender.wait() + self.receiver.wait() + cluster[1].kill() + + class BlockedSend(Thread): + """Send a message, send is expected to block. + Verify that it does block (for a given timeout), then allow + waiting till it unblocks when it is expected to do so.""" + def __init__(self, sender, msg): + self.sender, self.msg = sender, msg + self.blocked = True + self.condition = Condition() + self.timeout = 0.1 # Time to wait for expected results. + Thread.__init__(self) + def run(self): + try: + self.sender.send(self.msg, sync=True) + self.condition.acquire() + try: + self.blocked = False + self.condition.notify() + finally: self.condition.release() + except Exception,e: print "BlockedSend exception: %s"%e + def start(self): + Thread.start(self) + time.sleep(self.timeout) + assert self.blocked # Expected to block + def assert_blocked(self): assert self.blocked + def wait(self): # Now expecting to unblock + self.condition.acquire() + try: + while self.blocked: + self.condition.wait(self.timeout) + if self.blocked: raise Exception("Timed out waiting for send to unblock") + finally: self.condition.release() + self.join() + + def queue_flowlimit_test(self, brokers): + """Verify that the queue's flowlimit configuration and state are + correctly replicated. + The brokers argument allows this test to run on single broker, + cluster of 2 pre-startd brokers or cluster where second broker + starts after queue is in flow control. + """ + # configure a queue with a specific flow limit on first broker + ssn0 = brokers.first().connect().session() + s0 = ssn0.sender("flq; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.flow_stop_count':5, 'qpid.flow_resume_count':3}}}}") + brokers.first().startQmf() + q1 = [q for q in brokers.first().qmf_session.getObjects(_class="queue") if q.name == "flq"][0] + oid = q1.getObjectId() + self.assertEqual(q1.name, "flq") + self.assertEqual(q1.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L}) + assert not q1.flowStopped + self.assertEqual(q1.flowStoppedCount, 0) + + # fill the queue on one broker until flow control is active + for x in range(5): s0.send(Message(str(x))) + sender = ShortTests.BlockedSend(s0, Message(str(6))) + sender.start() # Tests that sender does block + # Verify the broker queue goes into a flowStopped state + deadline = time.time() + 1 + while not q1.flowStopped and time.time() < deadline: q1.update() + assert q1.flowStopped + self.assertEqual(q1.flowStoppedCount, 1) + sender.assert_blocked() # Still blocked + + # Now verify the both brokers in cluster have same configuration + brokers.second().startQmf() + qs = brokers.second().qmf_session.getObjects(_objectId=oid) + self.assertEqual(len(qs), 1) + q2 = qs[0] + self.assertEqual(q2.name, "flq") + self.assertEqual(q2.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L}) + assert q2.flowStopped + self.assertEqual(q2.flowStoppedCount, 1) + + # now drain the queue using a session to the other broker + ssn1 = brokers.second().connect().session() + r1 = ssn1.receiver("flq", capacity=6) + for x in range(4): + r1.fetch(timeout=0) + ssn1.acknowledge() + sender.wait() # Verify no longer blocked. + + # and re-verify state of queue on both brokers + q1.update() + assert not q1.flowStopped + q2.update() + assert not q2.flowStopped + + ssn0.connection.close() + ssn1.connection.close() + cluster_test_logs.verify_logs() + + def test_queue_flowlimit(self): + """Test flow limits on a standalone broker""" + broker = self.broker() + class Brokers: + def first(self): return broker + def second(self): return broker + self.queue_flowlimit_test(Brokers()) + + def test_queue_flowlimit_cluster(self): + cluster = self.cluster(2) + class Brokers: + def first(self): return cluster[0] + def second(self): return cluster[1] + self.queue_flowlimit_test(Brokers()) + + def test_queue_flowlimit_cluster_join(self): + cluster = self.cluster(1) + class Brokers: + def first(self): return cluster[0] + def second(self): + if len(cluster) == 1: cluster.start() + return cluster[1] + self.queue_flowlimit_test(Brokers()) + + def test_queue_flowlimit_replicate(self): + """ Verify that a queue which is in flow control BUT has drained BELOW + the flow control 'stop' threshold, is correctly replicated when a new + broker is added to the cluster. + """ + + class AsyncSender(Thread): + """Send a fixed number of msgs from a sender in a separate thread + so it may block without blocking the test. + """ + def __init__(self, broker, address, count=1, size=4): + Thread.__init__(self) + self.daemon = True + self.broker = broker + self.queue = address + self.count = count + self.size = size + self.done = False + + def run(self): + self.sender = subprocess.Popen(["qpid-send", + "--capacity=1", + "--content-size=%s" % self.size, + "--messages=%s" % self.count, + "--failover-updates", + "--connection-options={reconnect:true}", + "--address=%s" % self.queue, + "--broker=%s" % self.broker.host_port()]) + self.sender.wait() + self.done = True + + cluster = self.cluster(2) + # create a queue with rather draconian flow control settings + ssn0 = cluster[0].connect().session() + s0 = ssn0.sender("flq; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.flow_stop_count':100, 'qpid.flow_resume_count':20}}}}") + + # fire off the sending thread to broker[0], and wait until the queue + # hits flow control on broker[1] + sender = AsyncSender(cluster[0], "flq", count=110); + sender.start(); + + cluster[1].startQmf() + q_obj = [q for q in cluster[1].qmf_session.getObjects(_class="queue") if q.name == "flq"][0] + deadline = time.time() + 10 + while not q_obj.flowStopped and time.time() < deadline: + q_obj.update() + assert q_obj.flowStopped + assert not sender.done + assert q_obj.msgDepth < 110 + + # Now drain enough messages on broker[1] to drop below the flow stop + # threshold, but not relieve flow control... + receiver = subprocess.Popen(["qpid-receive", + "--messages=15", + "--timeout=1", + "--print-content=no", + "--failover-updates", + "--connection-options={reconnect:true}", + "--ack-frequency=1", + "--address=flq", + "--broker=%s" % cluster[1].host_port()]) + receiver.wait() + q_obj.update() + assert q_obj.flowStopped + assert not sender.done + current_depth = q_obj.msgDepth + + # add a new broker to the cluster, and verify that the queue is in flow + # control on that broker + cluster.start() + cluster[2].startQmf() + q_obj = [q for q in cluster[2].qmf_session.getObjects(_class="queue") if q.name == "flq"][0] + assert q_obj.flowStopped + assert q_obj.msgDepth == current_depth + + # now drain the queue on broker[2], and verify that the sender becomes + # unblocked + receiver = subprocess.Popen(["qpid-receive", + "--messages=95", + "--timeout=1", + "--print-content=no", + "--failover-updates", + "--connection-options={reconnect:true}", + "--ack-frequency=1", + "--address=flq", + "--broker=%s" % cluster[2].host_port()]) + receiver.wait() + q_obj.update() + assert not q_obj.flowStopped + assert q_obj.msgDepth == 0 + + # verify that the sender has become unblocked + sender.join(timeout=5) + assert not sender.isAlive() + assert sender.done + + def test_blocked_queue_delete(self): + """Verify that producers which are blocked on a queue due to flow + control are unblocked when that queue is deleted. + """ + + cluster = self.cluster(2) + cluster[0].startQmf() + cluster[1].startQmf() + + # configure a queue with a specific flow limit on first broker + ssn0 = cluster[0].connect().session() + s0 = ssn0.sender("flq; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.flow_stop_count':5, 'qpid.flow_resume_count':3}}}}") + q1 = [q for q in cluster[0].qmf_session.getObjects(_class="queue") if q.name == "flq"][0] + oid = q1.getObjectId() + self.assertEqual(q1.name, "flq") + self.assertEqual(q1.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L}) + assert not q1.flowStopped + self.assertEqual(q1.flowStoppedCount, 0) + + # fill the queue on one broker until flow control is active + for x in range(5): s0.send(Message(str(x))) + sender = ShortTests.BlockedSend(s0, Message(str(6))) + sender.start() # Tests that sender does block + # Verify the broker queue goes into a flowStopped state + deadline = time.time() + 1 + while not q1.flowStopped and time.time() < deadline: q1.update() + assert q1.flowStopped + self.assertEqual(q1.flowStoppedCount, 1) + sender.assert_blocked() # Still blocked + + # Now verify the both brokers in cluster have same configuration + qs = cluster[1].qmf_session.getObjects(_objectId=oid) + self.assertEqual(len(qs), 1) + q2 = qs[0] + self.assertEqual(q2.name, "flq") + self.assertEqual(q2.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L}) + assert q2.flowStopped + self.assertEqual(q2.flowStoppedCount, 1) + + # now delete the blocked queue from other broker + ssn1 = cluster[1].connect().session() + self.evaluate_address(ssn1, "flq;{delete:always}") + sender.wait() # Verify no longer blocked. + + ssn0.connection.close() + ssn1.connection.close() + cluster_test_logs.verify_logs() + + + def test_alternate_exchange_update(self): + """Verify that alternate-exchange on exchanges and queues is propagated to new members of a cluster. """ + cluster = self.cluster(1) + s0 = cluster[0].connect().session() + # create alt queue bound to amq.fanout exchange, will be destination for alternate exchanges + self.evaluate_address(s0, "alt;{create:always,node:{x-bindings:[{exchange:'amq.fanout',queue:alt}]}}") + # create direct exchange ex with alternate-exchange amq.fanout and no queues bound + self.evaluate_address(s0, "ex;{create:always,node:{type:topic, x-declare:{type:'direct', alternate-exchange:'amq.fanout'}}}") + # create queue q with alternate-exchange amq.fanout + self.evaluate_address(s0, "q;{create:always,node:{type:queue, x-declare:{alternate-exchange:'amq.fanout'}}}") + + def verify(broker): + s = broker.connect().session() + # Verify unmatched message goes to ex's alternate. + s.sender("ex").send("foo") + self.assertEqual("foo", s.receiver("alt").fetch(timeout=0).content) + # Verify rejected message goes to q's alternate. + s.sender("q").send("bar") + msg = s.receiver("q").fetch(timeout=0) + self.assertEqual("bar", msg.content) + s.acknowledge(msg, Disposition(REJECTED)) # Reject the message + self.assertEqual("bar", s.receiver("alt").fetch(timeout=0).content) + + verify(cluster[0]) + cluster.start() + verify(cluster[1]) + + def test_binding_order(self): + """Regression test for binding order inconsistency in cluster""" + cluster = self.cluster(1) + c0 = cluster[0].connect() + s0 = c0.session() + # Declare multiple queues bound to same key on amq.topic + def declare(q,max=0): + if max: declare = 'x-declare:{arguments:{"qpid.max_count":%d, "qpid.flow_stop_count":0}}'%max + else: declare = 'x-declare:{}' + bind='x-bindings:[{queue:%s,key:key,exchange:"amq.topic"}]'%(q) + s0.sender("%s;{create:always,node:{%s,%s}}" % (q,declare,bind)) + declare('d',max=4) # Only one with a limit + for q in ['c', 'b','a']: declare(q) + # Add a cluster member, send enough messages to exceed the max count + cluster.start() + try: + s = s0.sender('amq.topic/key') + for m in xrange(1,6): s.send(Message(str(m))) + self.fail("Expected capacity exceeded exception") + except messaging.exceptions.TargetCapacityExceeded: pass + c1 = cluster[1].connect() + s1 = c1.session() + s0 = c0.session() # Old session s0 is broken by exception. + # Verify queue contents are consistent. + for q in ['a','b','c','d']: + self.assertEqual(self.browse(s0, q), self.browse(s1, q)) + # Verify queue contents are "best effort" + for q in ['a','b','c']: self.assert_browse(s1,q,[str(n) for n in xrange(1,6)]) + self.assert_browse(s1,'d',[str(n) for n in xrange(1,5)]) + + def test_deleted_exchange(self): + """QPID-3215: cached exchange reference can cause cluster inconsistencies + if exchange is deleted/recreated + Verify stand-alone case + """ + cluster = self.cluster() + # Verify we do not route message via an exchange that has been destroyed. + cluster.start() + s0 = cluster[0].connect().session() + self.evaluate_address(s0, "ex;{create:always,node:{type:topic}}") + self.evaluate_address(s0, "q;{create:always,node:{x-bindings:[{exchange:'ex',queue:q,key:foo}]}}") + send0 = s0.sender("ex/foo") + send0.send("foo") + self.assert_browse(s0, "q", ["foo"]) + self.evaluate_address(s0, "ex;{delete:always}") + try: + send0.send("bar") # Should fail, exchange is deleted. + self.fail("Expected not-found exception") + except qpid.messaging.NotFound: pass + self.assert_browse(cluster[0].connect().session(), "q", ["foo"]) + + def test_deleted_exchange_inconsistent(self): + """QPID-3215: cached exchange reference can cause cluster inconsistencies + if exchange is deleted/recreated + + Verify cluster inconsistency. + """ + cluster = self.cluster() + cluster.start() + s0 = cluster[0].connect().session() + self.evaluate_address(s0, "ex;{create:always,node:{type:topic}}") + self.evaluate_address(s0, "q;{create:always,node:{x-bindings:[{exchange:'ex',queue:q,key:foo}]}}") + send0 = s0.sender("ex/foo") + send0.send("foo") + self.assert_browse(s0, "q", ["foo"]) + + cluster.start() + s1 = cluster[1].connect().session() + self.evaluate_address(s0, "ex;{delete:always}") + try: + send0.send("bar") + self.fail("Expected not-found exception") + except qpid.messaging.NotFound: pass + + self.assert_browse(s1, "q", ["foo"]) + + +class LongTests(BrokerTest): + """Tests that can run for a long time if -DDURATION=<minutes> is set""" + def duration(self): + d = self.config.defines.get("DURATION") + if d: return float(d)*60 + else: return 3 # Default is to be quick + + def test_failover(self): + """Test fail-over during continuous send-receive with errors""" + + # Original cluster will all be killed so expect exit with failure + cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL) + for b in cluster: ErrorGenerator(b) + + # Start sender and receiver threads + cluster[0].declare_queue("test-queue") + sender = NumberedSender(cluster[1], 1000) # Max queue depth + receiver = NumberedReceiver(cluster[2], sender) + receiver.start() + sender.start() + + # Kill original brokers, start new ones for the duration. + endtime = time.time() + self.duration() + i = 0 + while time.time() < endtime: + cluster[i].kill() + i += 1 + b = cluster.start(expect=EXPECT_EXIT_FAIL) + ErrorGenerator(b) + time.sleep(5) + sender.stop() + receiver.stop() + for i in range(i, len(cluster)): cluster[i].kill() + + def test_management(self, args=[]): + """ + Stress test: Run management clients and other clients concurrently + while killing and restarting brokers. + """ + + class ClientLoop(StoppableThread): + """Run a client executable in a loop.""" + def __init__(self, broker, cmd): + StoppableThread.__init__(self) + self.broker=broker + self.cmd = cmd # Client command. + self.lock = Lock() + self.process = None # Client process. + self.start() + + def run(self): + try: + while True: + self.lock.acquire() + try: + if self.stopped: break + self.process = self.broker.test.popen( + self.cmd, expect=EXPECT_UNKNOWN) + finally: + self.lock.release() + try: + exit = self.process.wait() + except OSError, e: + # Process may already have been killed by self.stop() + break + except Exception, e: + self.process.unexpected( + "client of %s: %s"%(self.broker.name, e)) + self.lock.acquire() + try: + if self.stopped: break + if exit != 0: + self.process.unexpected( + "client of %s exit code %s"%(self.broker.name, exit)) + finally: + self.lock.release() + except Exception, e: + self.error = RethrownException("Error in ClientLoop.run") + + def stop(self): + """Stop the running client and wait for it to exit""" + self.lock.acquire() + try: + if self.stopped: return + self.stopped = True + if self.process: + try: self.process.kill() # Kill the client. + except OSError: pass # The client might not be running. + finally: self.lock.release() + StoppableThread.stop(self) + + # body of test_management() + + args += ["--mgmt-pub-interval", 1] + args += ["--log-enable=trace+:management"] + # Use store if present. + if BrokerTest.store_lib: args +=["--load-module", BrokerTest.store_lib] + cluster = self.cluster(3, args) + + clients = [] # Per-broker list of clients that only connect to one broker. + mclients = [] # Management clients that connect to every broker in the cluster. + + def start_clients(broker): + """Start ordinary clients for a broker.""" + cmds=[ + ["qpid-tool", "localhost:%s"%(broker.port())], + ["qpid-perftest", "--count=5000", "--durable=yes", + "--base-name", str(qpid.datatypes.uuid4()), "--port", broker.port()], + ["qpid-txtest", "--queue-base-name", "tx-%s"%str(qpid.datatypes.uuid4()), + "--port", broker.port()], + ["qpid-queue-stats", "-a", "localhost:%s" %(broker.port())], + ["testagent", "localhost", str(broker.port())] ] + clients.append([ClientLoop(broker, cmd) for cmd in cmds]) + + def start_mclients(broker): + """Start management clients that make multiple connections.""" + cmd = ["qpid-stat", "-b", "localhost:%s" %(broker.port())] + mclients.append(ClientLoop(broker, cmd)) + + endtime = time.time() + self.duration() + # For long duration, first run is a quarter of the duration. + runtime = max(5, self.duration() / 4.0) + alive = 0 # First live cluster member + for i in range(len(cluster)): start_clients(cluster[i]) + start_mclients(cluster[alive]) + + while time.time() < endtime: + time.sleep(runtime) + runtime = 5 # Remaining runs 5 seconds, frequent broker kills + for b in cluster[alive:]: b.ready() # Check if a broker crashed. + # Kill the first broker, expect the clients to fail. + b = cluster[alive] + b.expect = EXPECT_EXIT_FAIL + b.kill() + # Stop the brokers clients and all the mclients. + for c in clients[alive] + mclients: + try: c.stop() + except: pass # Ignore expected errors due to broker shutdown. + clients[alive] = [] + mclients = [] + # Start another broker and clients + alive += 1 + cluster.start() + start_clients(cluster[-1]) + start_mclients(cluster[alive]) + for c in chain(mclients, *clients): + c.stop() + # Verify that logs are consistent + cluster_test_logs.verify_logs() + + def test_management_qmf2(self): + self.test_management(args=["--mgmt-qmf2=yes"]) + + def test_connect_consistent(self): + args=["--mgmt-pub-interval=1","--log-enable=trace+:management"] + cluster = self.cluster(2, args=args) + end = time.time() + self.duration() + while (time.time() < end): # Get a management interval + for i in xrange(1000): cluster[0].connect().close() + cluster_test_logs.verify_logs() + + def test_flowlimit_failover(self): + """Test fail-over during continuous send-receive with flow control + active. + """ + + # Original cluster will all be killed so expect exit with failure + cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL) + #for b in cluster: ErrorGenerator(b) + + # create a queue with rather draconian flow control settings + ssn0 = cluster[0].connect().session() + s0 = ssn0.sender("test-queue; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.flow_stop_count':2000, 'qpid.flow_resume_count':100}}}}") + + receiver = NumberedReceiver(cluster[2]) + receiver.start() + senders = [NumberedSender(cluster[i]) for i in range(1,3)] + for s in senders: + s.start() + + # Kill original brokers, start new ones for the duration. + endtime = time.time() + self.duration(); + i = 0 + while time.time() < endtime: + cluster[i].kill() + i += 1 + b = cluster.start(expect=EXPECT_EXIT_FAIL) + #ErrorGenerator(b) + time.sleep(5) + #b = cluster[0] + #b.startQmf() + for s in senders: + s.stop() + receiver.stop() + for i in range(i, len(cluster)): cluster[i].kill() + + +class StoreTests(BrokerTest): + """ + Cluster tests that can only be run if there is a store available. + """ + def args(self): + assert BrokerTest.store_lib + return ["--load-module", BrokerTest.store_lib] + + def test_store_loaded(self): + """Ensure we are indeed loading a working store""" + broker = self.broker(self.args(), name="recoverme", expect=EXPECT_EXIT_FAIL) + m = Message("x", durable=True) + broker.send_message("q", m) + broker.kill() + broker = self.broker(self.args(), name="recoverme") + self.assertEqual("x", broker.get_message("q").content) + + def test_kill_restart(self): + """Verify we can kill/resetart a broker with store in a cluster""" + cluster = self.cluster(1, self.args()) + cluster.start("restartme", expect=EXPECT_EXIT_FAIL).kill() + + # Send a message, retrieve from the restarted broker + cluster[0].send_message("q", "x") + m = cluster.start("restartme").get_message("q") + self.assertEqual("x", m.content) + + def stop_cluster(self,broker): + """Clean shut-down of a cluster""" + self.assertEqual(0, qpid_cluster.main( + ["-kf", broker.host_port()])) + + def test_persistent_restart(self): + """Verify persistent cluster shutdown/restart scenarios""" + cluster = self.cluster(0, args=self.args() + ["--cluster-size=3"]) + a = cluster.start("a", expect=EXPECT_EXIT_OK, wait=False) + b = cluster.start("b", expect=EXPECT_EXIT_OK, wait=False) + c = cluster.start("c", expect=EXPECT_EXIT_FAIL, wait=True) + a.send_message("q", Message("1", durable=True)) + # Kill & restart one member. + c.kill() + self.assertEqual(a.get_message("q").content, "1") + a.send_message("q", Message("2", durable=True)) + c = cluster.start("c", expect=EXPECT_EXIT_OK) + self.assertEqual(c.get_message("q").content, "2") + # Shut down the entire cluster cleanly and bring it back up + a.send_message("q", Message("3", durable=True)) + self.stop_cluster(a) + a = cluster.start("a", wait=False) + b = cluster.start("b", wait=False) + c = cluster.start("c", wait=True) + self.assertEqual(a.get_message("q").content, "3") + + def test_persistent_partial_failure(self): + # Kill 2 members, shut down the last cleanly then restart + # Ensure we use the clean database + cluster = self.cluster(0, args=self.args() + ["--cluster-size=3"]) + a = cluster.start("a", expect=EXPECT_EXIT_FAIL, wait=False) + b = cluster.start("b", expect=EXPECT_EXIT_FAIL, wait=False) + c = cluster.start("c", expect=EXPECT_EXIT_OK, wait=True) + a.send_message("q", Message("4", durable=True)) + a.kill() + b.kill() + self.assertEqual(c.get_message("q").content, "4") + c.send_message("q", Message("clean", durable=True)) + self.stop_cluster(c) + a = cluster.start("a", wait=False) + b = cluster.start("b", wait=False) + c = cluster.start("c", wait=True) + self.assertEqual(a.get_message("q").content, "clean") + + def test_wrong_cluster_id(self): + # Start a cluster1 broker, then try to restart in cluster2 + cluster1 = self.cluster(0, args=self.args()) + a = cluster1.start("a", expect=EXPECT_EXIT_OK) + a.terminate() + cluster2 = self.cluster(1, args=self.args()) + try: + a = cluster2.start("a", expect=EXPECT_EXIT_FAIL) + a.ready() + self.fail("Expected exception") + except: pass + + def test_wrong_shutdown_id(self): + # Start 2 members and shut down. + cluster = self.cluster(0, args=self.args()+["--cluster-size=2"]) + a = cluster.start("a", expect=EXPECT_EXIT_OK, wait=False) + b = cluster.start("b", expect=EXPECT_EXIT_OK, wait=False) + self.stop_cluster(a) + self.assertEqual(a.wait(), 0) + self.assertEqual(b.wait(), 0) + + # Restart with a different member and shut down. + a = cluster.start("a", expect=EXPECT_EXIT_OK, wait=False) + c = cluster.start("c", expect=EXPECT_EXIT_OK, wait=False) + self.stop_cluster(a) + self.assertEqual(a.wait(), 0) + self.assertEqual(c.wait(), 0) + # Mix members from both shutdown events, they should fail + # TODO aconway 2010-03-11: can't predict the exit status of these + # as it depends on the order of delivery of initial-status messages. + # See comment at top of this file. + a = cluster.start("a", expect=EXPECT_UNKNOWN, wait=False) + b = cluster.start("b", expect=EXPECT_UNKNOWN, wait=False) + self.assertRaises(Exception, lambda: a.ready()) + self.assertRaises(Exception, lambda: b.ready()) + + def test_solo_store_clean(self): + # A single node cluster should always leave a clean store. + cluster = self.cluster(0, self.args()) + a = cluster.start("a", expect=EXPECT_EXIT_FAIL) + a.send_message("q", Message("x", durable=True)) + a.kill() + a = cluster.start("a") + self.assertEqual(a.get_message("q").content, "x") + + def test_last_store_clean(self): + # Verify that only the last node in a cluster to shut down has + # a clean store. Start with cluster of 3, reduce to 1 then + # increase again to ensure that a node that was once alone but + # finally did not finish as the last node does not get a clean + # store. + cluster = self.cluster(0, self.args()) + a = cluster.start("a", expect=EXPECT_EXIT_FAIL) + self.assertEqual(a.store_state(), "clean") + b = cluster.start("b", expect=EXPECT_EXIT_FAIL) + c = cluster.start("c", expect=EXPECT_EXIT_FAIL) + self.assertEqual(b.store_state(), "dirty") + self.assertEqual(c.store_state(), "dirty") + retry(lambda: a.store_state() == "dirty") + + a.send_message("q", Message("x", durable=True)) + a.kill() + b.kill() # c is last man, will mark store clean + retry(lambda: c.store_state() == "clean") + a = cluster.start("a", expect=EXPECT_EXIT_FAIL) # c no longer last man + retry(lambda: c.store_state() == "dirty") + c.kill() # a is now last man + retry(lambda: a.store_state() == "clean") + a.kill() + self.assertEqual(a.store_state(), "clean") + self.assertEqual(b.store_state(), "dirty") + self.assertEqual(c.store_state(), "dirty") + + def test_restart_clean(self): + """Verify that we can re-start brokers one by one in a + persistent cluster after a clean oshutdown""" + cluster = self.cluster(0, self.args()) + a = cluster.start("a", expect=EXPECT_EXIT_OK) + b = cluster.start("b", expect=EXPECT_EXIT_OK) + c = cluster.start("c", expect=EXPECT_EXIT_OK) + a.send_message("q", Message("x", durable=True)) + self.stop_cluster(a) + a = cluster.start("a") + b = cluster.start("b") + c = cluster.start("c") + self.assertEqual(c.get_message("q").content, "x") + + def test_join_sub_size(self): + """Verify that after starting a cluster with cluster-size=N, + we can join new members even if size < N-1""" + cluster = self.cluster(0, self.args()+["--cluster-size=3"]) + a = cluster.start("a", wait=False, expect=EXPECT_EXIT_FAIL) + b = cluster.start("b", wait=False, expect=EXPECT_EXIT_FAIL) + c = cluster.start("c") + a.send_message("q", Message("x", durable=True)) + a.send_message("q", Message("y", durable=True)) + a.kill() + b.kill() + a = cluster.start("a") + self.assertEqual(c.get_message("q").content, "x") + b = cluster.start("b") + self.assertEqual(c.get_message("q").content, "y") |