diff options
Diffstat (limited to 'cpp/src/tests/cluster_tests.py')
-rwxr-xr-x | cpp/src/tests/cluster_tests.py | 1057 |
1 files changed, 0 insertions, 1057 deletions
diff --git a/cpp/src/tests/cluster_tests.py b/cpp/src/tests/cluster_tests.py deleted file mode 100755 index 593791297a..0000000000 --- a/cpp/src/tests/cluster_tests.py +++ /dev/null @@ -1,1057 +0,0 @@ -#!/usr/bin/env python - -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import os, signal, sys, time, imp, re, subprocess, glob, cluster_test_logs -from qpid import datatypes, messaging -from brokertest import * -from qpid.harness import Skipped -from qpid.messaging import Message, Empty, Disposition, REJECTED -from threading import Thread, Lock, Condition -from logging import getLogger -from itertools import chain -from tempfile import NamedTemporaryFile - -log = getLogger("qpid.cluster_tests") - -# Note: brokers that shut themselves down due to critical error during -# normal operation will still have an exit code of 0. Brokers that -# shut down because of an error found during initialize will exit with -# a non-0 code. Hence the apparently inconsistent use of EXPECT_EXIT_OK -# and EXPECT_EXIT_FAIL in some of the tests below. - -# TODO aconway 2010-03-11: resolve this - ideally any exit due to an error -# should give non-0 exit status. - -# Import scripts as modules -qpid_cluster=import_script(checkenv("QPID_CLUSTER_EXEC")) - -def readfile(filename): - """Returns te content of file named filename as a string""" - f = file(filename) - try: return f.read() - finally: f.close() - -class ShortTests(BrokerTest): - """Short cluster functionality tests.""" - - def test_message_replication(self): - """Test basic cluster message replication.""" - # Start a cluster, send some messages to member 0. - cluster = self.cluster(2) - s0 = cluster[0].connect().session() - s0.sender("q; {create:always}").send(Message("x")) - s0.sender("q; {create:always}").send(Message("y")) - s0.connection.close() - - # Verify messages available on member 1. - s1 = cluster[1].connect().session() - m = s1.receiver("q", capacity=1).fetch(timeout=1) - s1.acknowledge() - self.assertEqual("x", m.content) - s1.connection.close() - - # Start member 2 and verify messages available. - s2 = cluster.start().connect().session() - m = s2.receiver("q", capacity=1).fetch(timeout=1) - s2.acknowledge() - self.assertEqual("y", m.content) - s2.connection.close() - - def test_store_direct_update_match(self): - """Verify that brokers stores an identical message whether they receive it - direct from clients or during an update, no header or other differences""" - cluster = self.cluster(0, args=["--load-module", self.test_store_lib]) - cluster.start(args=["--test-store-dump", "direct.dump"]) - # Try messages with various headers - cluster[0].send_message("q", Message(durable=True, content="foobar", - subject="subject", - reply_to="reply_to", - properties={"n":10})) - # Try messages of different sizes - for size in range(0,10000,100): - cluster[0].send_message("q", Message(content="x"*size, durable=True)) - # Try sending via named exchange - c = cluster[0].connect_old() - s = c.session(str(qpid.datatypes.uuid4())) - s.exchange_bind(exchange="amq.direct", binding_key="foo", queue="q") - props = s.delivery_properties(routing_key="foo", delivery_mode=2) - s.message_transfer( - destination="amq.direct", - message=qpid.datatypes.Message(props, "content")) - - # Now update a new member and compare their dumps. - cluster.start(args=["--test-store-dump", "updatee.dump"]) - assert readfile("direct.dump") == readfile("updatee.dump") - os.remove("direct.dump") - os.remove("updatee.dump") - - def test_sasl(self): - """Test SASL authentication and encryption in a cluster""" - sasl_config=os.path.join(self.rootdir, "sasl_config") - acl=os.path.join(os.getcwd(), "policy.acl") - aclf=file(acl,"w") - aclf.write(""" -acl deny zag@QPID create queue -acl allow all all -""") - aclf.close() - cluster = self.cluster(2, args=["--auth", "yes", - "--sasl-config", sasl_config, - "--load-module", os.getenv("ACL_LIB"), - "--acl-file", acl]) - - # Valid user/password, ensure queue is created. - c = cluster[0].connect(username="zig", password="zig") - c.session().sender("ziggy;{create:always}") - c.close() - c = cluster[1].connect(username="zig", password="zig") - c.session().receiver("ziggy;{assert:always}") - c.close() - for b in cluster: b.ready() # Make sure all brokers still running. - - # Valid user, bad password - try: - cluster[0].connect(username="zig", password="foo").close() - self.fail("Expected exception") - except messaging.exceptions.ConnectionError: pass - for b in cluster: b.ready() # Make sure all brokers still running. - - # Bad user ID - try: - cluster[0].connect(username="foo", password="bar").close() - self.fail("Expected exception") - except messaging.exceptions.ConnectionError: pass - for b in cluster: b.ready() # Make sure all brokers still running. - - # Action disallowed by ACL - c = cluster[0].connect(username="zag", password="zag") - try: - s = c.session() - s.sender("zaggy;{create:always}") - s.close() - self.fail("Expected exception") - except messaging.exceptions.UnauthorizedAccess: pass - # make sure the queue was not created at the other node. - c = cluster[0].connect(username="zag", password="zag") - try: - s = c.session() - s.sender("zaggy;{assert:always}") - s.close() - self.fail("Expected exception") - except messaging.exceptions.NotFound: pass - - def test_user_id_update(self): - """Ensure that user-id of an open session is updated to new cluster members""" - sasl_config=os.path.join(self.rootdir, "sasl_config") - cluster = self.cluster(1, args=["--auth", "yes", "--sasl-config", sasl_config,]) - c = cluster[0].connect(username="zig", password="zig") - s = c.session().sender("q;{create:always}") - s.send(Message("x", user_id="zig")) # Message sent before start new broker - cluster.start() - s.send(Message("y", user_id="zig")) # Messsage sent after start of new broker - # Verify brokers are healthy and messages are on the queue. - self.assertEqual("x", cluster[0].get_message("q").content) - self.assertEqual("y", cluster[1].get_message("q").content) - - def test_link_events(self): - """Regression test for https://bugzilla.redhat.com/show_bug.cgi?id=611543""" - args = ["--mgmt-pub-interval", 1] # Publish management information every second. - broker1 = self.cluster(1, args)[0] - broker2 = self.cluster(1, args)[0] - qp = self.popen(["qpid-printevents", broker1.host_port()], EXPECT_RUNNING) - qr = self.popen(["qpid-route", "route", "add", - broker1.host_port(), broker2.host_port(), - "amq.fanout", "key" - ], EXPECT_EXIT_OK) - # Look for link event in printevents output. - retry(lambda: find_in_file("brokerLinkUp", qp.outfile("out"))) - broker1.ready() - broker2.ready() - - def test_queue_cleaner(self): - """ Regression test to ensure that cleanup of expired messages works correctly """ - cluster = self.cluster(2, args=["--queue-purge-interval", 3]) - - s0 = cluster[0].connect().session() - sender = s0.sender("my-lvq; {create: always, node:{x-declare:{arguments:{'qpid.last_value_queue':1}}}}") - #send 10 messages that will all expire and be cleaned up - for i in range(1, 10): - msg = Message("message-%s" % i) - msg.properties["qpid.LVQ_key"] = "a" - msg.ttl = 0.1 - sender.send(msg) - #wait for queue cleaner to run - time.sleep(3) - - #test all is ok by sending and receiving a message - msg = Message("non-expiring") - msg.properties["qpid.LVQ_key"] = "b" - sender.send(msg) - s0.connection.close() - s1 = cluster[1].connect().session() - m = s1.receiver("my-lvq", capacity=1).fetch(timeout=1) - s1.acknowledge() - self.assertEqual("non-expiring", m.content) - s1.connection.close() - - for b in cluster: b.ready() # Make sure all brokers still running. - - - def test_amqfailover_visible(self): - """Verify that the amq.failover exchange can be seen by - QMF-based tools - regression test for BZ615300.""" - broker1 = self.cluster(1)[0] - broker2 = self.cluster(1)[0] - qs = subprocess.Popen(["qpid-stat", "-e", broker1.host_port()], stdout=subprocess.PIPE) - out = qs.communicate()[0] - assert out.find("amq.failover") > 0 - - def evaluate_address(self, session, address): - """Create a receiver just to evaluate an address for its side effects""" - r = session.receiver(address) - r.close() - - def test_expire_fanout(self): - """Regression test for QPID-2874: Clustered broker crashes in assertion in - cluster/ExpiryPolicy.cpp. - Caused by a fan-out message being updated as separate messages""" - cluster = self.cluster(1) - session0 = cluster[0].connect().session() - # Create 2 queues bound to fanout exchange. - self.evaluate_address(session0, "q1;{create:always,node:{x-bindings:[{exchange:'amq.fanout',queue:q1}]}}") - self.evaluate_address(session0, "q2;{create:always,node:{x-bindings:[{exchange:'amq.fanout',queue:q2}]}}") - queues = ["q1", "q2"] - # Send a fanout message with a long timeout - s = session0.sender("amq.fanout") - s.send(Message("foo", ttl=100), sync=False) - # Start a new member, check the messages - cluster.start() - session1 = cluster[1].connect().session() - for q in queues: self.assert_browse(session1, "q1", ["foo"]) - - def test_route_update(self): - """Regression test for https://issues.apache.org/jira/browse/QPID-2982 - Links and bridges associated with routes were not replicated on update. - This meant extra management objects and caused an exit if a management - client was attached. - """ - args=["--mgmt-pub-interval=1","--log-enable=trace+:management"] - cluster0 = self.cluster(1, args=args) - cluster1 = self.cluster(1, args=args) - assert 0 == subprocess.call( - ["qpid-route", "route", "add", cluster0[0].host_port(), - cluster1[0].host_port(), "dummy-exchange", "dummy-key", "-d"]) - cluster0.start() - - # Wait for qpid-tool:list on cluster0[0] to generate expected output. - pattern = re.compile("org.apache.qpid.broker.*link") - qpid_tool = subprocess.Popen(["qpid-tool", cluster0[0].host_port()], - stdin=subprocess.PIPE, stdout=subprocess.PIPE) - class Scanner(Thread): - def __init__(self): self.found = False; Thread.__init__(self) - def run(self): - for l in qpid_tool.stdout: - if pattern.search(l): self.found = True; return - scanner = Scanner() - scanner.start() - start = time.time() - try: - # Wait up to 5 second timeout for scanner to find expected output - while not scanner.found and time.time() < start + 5: - qpid_tool.stdin.write("list\n") # Ask qpid-tool to list - for b in cluster0: b.ready() # Raise if any brokers are down - finally: - qpid_tool.stdin.write("quit\n") - qpid_tool.wait() - scanner.join() - assert scanner.found - # Regression test for https://issues.apache.org/jira/browse/QPID-3235 - # Inconsistent stats when changing elder. - - # Force a change of elder - cluster0.start() - cluster0[0].kill() - time.sleep(2) # Allow a management interval to pass. - # Verify logs are consistent - cluster_test_logs.verify_logs() - - def test_redelivered(self): - """Verify that redelivered flag is set correctly on replayed messages""" - cluster = self.cluster(2, expect=EXPECT_EXIT_FAIL) - url = "amqp:tcp:%s,tcp:%s" % (cluster[0].host_port(), cluster[1].host_port()) - queue = "my-queue" - cluster[0].declare_queue(queue) - self.sender = self.popen( - ["qpid-send", - "--broker", url, - "--address", queue, - "--sequence=true", - "--send-eos=1", - "--messages=100000", - "--connection-options={reconnect:true}" - ]) - self.receiver = self.popen( - ["qpid-receive", - "--broker", url, - "--address", queue, - "--ignore-duplicates", - "--check-redelivered", - "--connection-options={reconnect:true}", - "--forever" - ]) - time.sleep(1)#give sender enough time to have some messages to replay - cluster[0].kill() - self.sender.wait() - self.receiver.wait() - cluster[1].kill() - - class BlockedSend(Thread): - """Send a message, send is expected to block. - Verify that it does block (for a given timeout), then allow - waiting till it unblocks when it is expected to do so.""" - def __init__(self, sender, msg): - self.sender, self.msg = sender, msg - self.blocked = True - self.condition = Condition() - self.timeout = 0.1 # Time to wait for expected results. - Thread.__init__(self) - def run(self): - try: - self.sender.send(self.msg, sync=True) - self.condition.acquire() - try: - self.blocked = False - self.condition.notify() - finally: self.condition.release() - except Exception,e: print "BlockedSend exception: %s"%e - def start(self): - Thread.start(self) - time.sleep(self.timeout) - assert self.blocked # Expected to block - def assert_blocked(self): assert self.blocked - def wait(self): # Now expecting to unblock - self.condition.acquire() - try: - while self.blocked: - self.condition.wait(self.timeout) - if self.blocked: raise Exception("Timed out waiting for send to unblock") - finally: self.condition.release() - self.join() - - def queue_flowlimit_test(self, brokers): - """Verify that the queue's flowlimit configuration and state are - correctly replicated. - The brokers argument allows this test to run on single broker, - cluster of 2 pre-startd brokers or cluster where second broker - starts after queue is in flow control. - """ - # configure a queue with a specific flow limit on first broker - ssn0 = brokers.first().connect().session() - s0 = ssn0.sender("flq; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.flow_stop_count':5, 'qpid.flow_resume_count':3}}}}") - brokers.first().startQmf() - q1 = [q for q in brokers.first().qmf_session.getObjects(_class="queue") if q.name == "flq"][0] - oid = q1.getObjectId() - self.assertEqual(q1.name, "flq") - self.assertEqual(q1.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L}) - assert not q1.flowStopped - self.assertEqual(q1.flowStoppedCount, 0) - - # fill the queue on one broker until flow control is active - for x in range(5): s0.send(Message(str(x))) - sender = ShortTests.BlockedSend(s0, Message(str(6))) - sender.start() # Tests that sender does block - # Verify the broker queue goes into a flowStopped state - deadline = time.time() + 1 - while not q1.flowStopped and time.time() < deadline: q1.update() - assert q1.flowStopped - self.assertEqual(q1.flowStoppedCount, 1) - sender.assert_blocked() # Still blocked - - # Now verify the both brokers in cluster have same configuration - brokers.second().startQmf() - qs = brokers.second().qmf_session.getObjects(_objectId=oid) - self.assertEqual(len(qs), 1) - q2 = qs[0] - self.assertEqual(q2.name, "flq") - self.assertEqual(q2.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L}) - assert q2.flowStopped - self.assertEqual(q2.flowStoppedCount, 1) - - # now drain the queue using a session to the other broker - ssn1 = brokers.second().connect().session() - r1 = ssn1.receiver("flq", capacity=6) - for x in range(4): - r1.fetch(timeout=0) - ssn1.acknowledge() - sender.wait() # Verify no longer blocked. - - # and re-verify state of queue on both brokers - q1.update() - assert not q1.flowStopped - q2.update() - assert not q2.flowStopped - - ssn0.connection.close() - ssn1.connection.close() - cluster_test_logs.verify_logs() - - def test_queue_flowlimit(self): - """Test flow limits on a standalone broker""" - broker = self.broker() - class Brokers: - def first(self): return broker - def second(self): return broker - self.queue_flowlimit_test(Brokers()) - - def test_queue_flowlimit_cluster(self): - cluster = self.cluster(2) - class Brokers: - def first(self): return cluster[0] - def second(self): return cluster[1] - self.queue_flowlimit_test(Brokers()) - - def test_queue_flowlimit_cluster_join(self): - cluster = self.cluster(1) - class Brokers: - def first(self): return cluster[0] - def second(self): - if len(cluster) == 1: cluster.start() - return cluster[1] - self.queue_flowlimit_test(Brokers()) - - def test_queue_flowlimit_replicate(self): - """ Verify that a queue which is in flow control BUT has drained BELOW - the flow control 'stop' threshold, is correctly replicated when a new - broker is added to the cluster. - """ - - class AsyncSender(Thread): - """Send a fixed number of msgs from a sender in a separate thread - so it may block without blocking the test. - """ - def __init__(self, broker, address, count=1, size=4): - Thread.__init__(self) - self.daemon = True - self.broker = broker - self.queue = address - self.count = count - self.size = size - self.done = False - - def run(self): - self.sender = subprocess.Popen(["qpid-send", - "--capacity=1", - "--content-size=%s" % self.size, - "--messages=%s" % self.count, - "--failover-updates", - "--connection-options={reconnect:true}", - "--address=%s" % self.queue, - "--broker=%s" % self.broker.host_port()]) - self.sender.wait() - self.done = True - - cluster = self.cluster(2) - # create a queue with rather draconian flow control settings - ssn0 = cluster[0].connect().session() - s0 = ssn0.sender("flq; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.flow_stop_count':100, 'qpid.flow_resume_count':20}}}}") - - # fire off the sending thread to broker[0], and wait until the queue - # hits flow control on broker[1] - sender = AsyncSender(cluster[0], "flq", count=110); - sender.start(); - - cluster[1].startQmf() - q_obj = [q for q in cluster[1].qmf_session.getObjects(_class="queue") if q.name == "flq"][0] - deadline = time.time() + 10 - while not q_obj.flowStopped and time.time() < deadline: - q_obj.update() - assert q_obj.flowStopped - assert not sender.done - assert q_obj.msgDepth < 110 - - # Now drain enough messages on broker[1] to drop below the flow stop - # threshold, but not relieve flow control... - receiver = subprocess.Popen(["qpid-receive", - "--messages=15", - "--timeout=1", - "--print-content=no", - "--failover-updates", - "--connection-options={reconnect:true}", - "--ack-frequency=1", - "--address=flq", - "--broker=%s" % cluster[1].host_port()]) - receiver.wait() - q_obj.update() - assert q_obj.flowStopped - assert not sender.done - current_depth = q_obj.msgDepth - - # add a new broker to the cluster, and verify that the queue is in flow - # control on that broker - cluster.start() - cluster[2].startQmf() - q_obj = [q for q in cluster[2].qmf_session.getObjects(_class="queue") if q.name == "flq"][0] - assert q_obj.flowStopped - assert q_obj.msgDepth == current_depth - - # now drain the queue on broker[2], and verify that the sender becomes - # unblocked - receiver = subprocess.Popen(["qpid-receive", - "--messages=95", - "--timeout=1", - "--print-content=no", - "--failover-updates", - "--connection-options={reconnect:true}", - "--ack-frequency=1", - "--address=flq", - "--broker=%s" % cluster[2].host_port()]) - receiver.wait() - q_obj.update() - assert not q_obj.flowStopped - assert q_obj.msgDepth == 0 - - # verify that the sender has become unblocked - sender.join(timeout=5) - assert not sender.isAlive() - assert sender.done - - def test_blocked_queue_delete(self): - """Verify that producers which are blocked on a queue due to flow - control are unblocked when that queue is deleted. - """ - - cluster = self.cluster(2) - cluster[0].startQmf() - cluster[1].startQmf() - - # configure a queue with a specific flow limit on first broker - ssn0 = cluster[0].connect().session() - s0 = ssn0.sender("flq; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.flow_stop_count':5, 'qpid.flow_resume_count':3}}}}") - q1 = [q for q in cluster[0].qmf_session.getObjects(_class="queue") if q.name == "flq"][0] - oid = q1.getObjectId() - self.assertEqual(q1.name, "flq") - self.assertEqual(q1.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L}) - assert not q1.flowStopped - self.assertEqual(q1.flowStoppedCount, 0) - - # fill the queue on one broker until flow control is active - for x in range(5): s0.send(Message(str(x))) - sender = ShortTests.BlockedSend(s0, Message(str(6))) - sender.start() # Tests that sender does block - # Verify the broker queue goes into a flowStopped state - deadline = time.time() + 1 - while not q1.flowStopped and time.time() < deadline: q1.update() - assert q1.flowStopped - self.assertEqual(q1.flowStoppedCount, 1) - sender.assert_blocked() # Still blocked - - # Now verify the both brokers in cluster have same configuration - qs = cluster[1].qmf_session.getObjects(_objectId=oid) - self.assertEqual(len(qs), 1) - q2 = qs[0] - self.assertEqual(q2.name, "flq") - self.assertEqual(q2.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L}) - assert q2.flowStopped - self.assertEqual(q2.flowStoppedCount, 1) - - # now delete the blocked queue from other broker - ssn1 = cluster[1].connect().session() - self.evaluate_address(ssn1, "flq;{delete:always}") - sender.wait() # Verify no longer blocked. - - ssn0.connection.close() - ssn1.connection.close() - cluster_test_logs.verify_logs() - - - def test_alternate_exchange_update(self): - """Verify that alternate-exchange on exchanges and queues is propagated to new members of a cluster. """ - cluster = self.cluster(1) - s0 = cluster[0].connect().session() - # create alt queue bound to amq.fanout exchange, will be destination for alternate exchanges - self.evaluate_address(s0, "alt;{create:always,node:{x-bindings:[{exchange:'amq.fanout',queue:alt}]}}") - # create direct exchange ex with alternate-exchange amq.fanout and no queues bound - self.evaluate_address(s0, "ex;{create:always,node:{type:topic, x-declare:{type:'direct', alternate-exchange:'amq.fanout'}}}") - # create queue q with alternate-exchange amq.fanout - self.evaluate_address(s0, "q;{create:always,node:{type:queue, x-declare:{alternate-exchange:'amq.fanout'}}}") - - def verify(broker): - s = broker.connect().session() - # Verify unmatched message goes to ex's alternate. - s.sender("ex").send("foo") - self.assertEqual("foo", s.receiver("alt").fetch(timeout=0).content) - # Verify rejected message goes to q's alternate. - s.sender("q").send("bar") - msg = s.receiver("q").fetch(timeout=0) - self.assertEqual("bar", msg.content) - s.acknowledge(msg, Disposition(REJECTED)) # Reject the message - self.assertEqual("bar", s.receiver("alt").fetch(timeout=0).content) - - verify(cluster[0]) - cluster.start() - verify(cluster[1]) - - def test_binding_order(self): - """Regression test for binding order inconsistency in cluster""" - cluster = self.cluster(1) - c0 = cluster[0].connect() - s0 = c0.session() - # Declare multiple queues bound to same key on amq.topic - def declare(q,max=0): - if max: declare = 'x-declare:{arguments:{"qpid.max_count":%d, "qpid.flow_stop_count":0}}'%max - else: declare = 'x-declare:{}' - bind='x-bindings:[{queue:%s,key:key,exchange:"amq.topic"}]'%(q) - s0.sender("%s;{create:always,node:{%s,%s}}" % (q,declare,bind)) - declare('d',max=4) # Only one with a limit - for q in ['c', 'b','a']: declare(q) - # Add a cluster member, send enough messages to exceed the max count - cluster.start() - try: - s = s0.sender('amq.topic/key') - for m in xrange(1,6): s.send(Message(str(m))) - self.fail("Expected capacity exceeded exception") - except messaging.exceptions.TargetCapacityExceeded: pass - c1 = cluster[1].connect() - s1 = c1.session() - s0 = c0.session() # Old session s0 is broken by exception. - # Verify queue contents are consistent. - for q in ['a','b','c','d']: - self.assertEqual(self.browse(s0, q), self.browse(s1, q)) - # Verify queue contents are "best effort" - for q in ['a','b','c']: self.assert_browse(s1,q,[str(n) for n in xrange(1,6)]) - self.assert_browse(s1,'d',[str(n) for n in xrange(1,5)]) - - def test_deleted_exchange(self): - """QPID-3215: cached exchange reference can cause cluster inconsistencies - if exchange is deleted/recreated - Verify stand-alone case - """ - cluster = self.cluster() - # Verify we do not route message via an exchange that has been destroyed. - cluster.start() - s0 = cluster[0].connect().session() - self.evaluate_address(s0, "ex;{create:always,node:{type:topic}}") - self.evaluate_address(s0, "q;{create:always,node:{x-bindings:[{exchange:'ex',queue:q,key:foo}]}}") - send0 = s0.sender("ex/foo") - send0.send("foo") - self.assert_browse(s0, "q", ["foo"]) - self.evaluate_address(s0, "ex;{delete:always}") - try: - send0.send("bar") # Should fail, exchange is deleted. - self.fail("Expected not-found exception") - except qpid.messaging.NotFound: pass - self.assert_browse(cluster[0].connect().session(), "q", ["foo"]) - - def test_deleted_exchange_inconsistent(self): - """QPID-3215: cached exchange reference can cause cluster inconsistencies - if exchange is deleted/recreated - - Verify cluster inconsistency. - """ - cluster = self.cluster() - cluster.start() - s0 = cluster[0].connect().session() - self.evaluate_address(s0, "ex;{create:always,node:{type:topic}}") - self.evaluate_address(s0, "q;{create:always,node:{x-bindings:[{exchange:'ex',queue:q,key:foo}]}}") - send0 = s0.sender("ex/foo") - send0.send("foo") - self.assert_browse(s0, "q", ["foo"]) - - cluster.start() - s1 = cluster[1].connect().session() - self.evaluate_address(s0, "ex;{delete:always}") - try: - send0.send("bar") - self.fail("Expected not-found exception") - except qpid.messaging.NotFound: pass - - self.assert_browse(s1, "q", ["foo"]) - - -class LongTests(BrokerTest): - """Tests that can run for a long time if -DDURATION=<minutes> is set""" - def duration(self): - d = self.config.defines.get("DURATION") - if d: return float(d)*60 - else: return 3 # Default is to be quick - - def test_failover(self): - """Test fail-over during continuous send-receive with errors""" - - # Original cluster will all be killed so expect exit with failure - cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL) - for b in cluster: ErrorGenerator(b) - - # Start sender and receiver threads - cluster[0].declare_queue("test-queue") - sender = NumberedSender(cluster[1], 1000) # Max queue depth - receiver = NumberedReceiver(cluster[2], sender) - receiver.start() - sender.start() - - # Kill original brokers, start new ones for the duration. - endtime = time.time() + self.duration() - i = 0 - while time.time() < endtime: - cluster[i].kill() - i += 1 - b = cluster.start(expect=EXPECT_EXIT_FAIL) - ErrorGenerator(b) - time.sleep(5) - sender.stop() - receiver.stop() - for i in range(i, len(cluster)): cluster[i].kill() - - def test_management(self, args=[]): - """ - Stress test: Run management clients and other clients concurrently - while killing and restarting brokers. - """ - - class ClientLoop(StoppableThread): - """Run a client executable in a loop.""" - def __init__(self, broker, cmd): - StoppableThread.__init__(self) - self.broker=broker - self.cmd = cmd # Client command. - self.lock = Lock() - self.process = None # Client process. - self.start() - - def run(self): - try: - while True: - self.lock.acquire() - try: - if self.stopped: break - self.process = self.broker.test.popen( - self.cmd, expect=EXPECT_UNKNOWN) - finally: - self.lock.release() - try: - exit = self.process.wait() - except OSError, e: - # Process may already have been killed by self.stop() - break - except Exception, e: - self.process.unexpected( - "client of %s: %s"%(self.broker.name, e)) - self.lock.acquire() - try: - if self.stopped: break - if exit != 0: - self.process.unexpected( - "client of %s exit code %s"%(self.broker.name, exit)) - finally: - self.lock.release() - except Exception, e: - self.error = RethrownException("Error in ClientLoop.run") - - def stop(self): - """Stop the running client and wait for it to exit""" - self.lock.acquire() - try: - if self.stopped: return - self.stopped = True - if self.process: - try: self.process.kill() # Kill the client. - except OSError: pass # The client might not be running. - finally: self.lock.release() - StoppableThread.stop(self) - - # body of test_management() - - args += ["--mgmt-pub-interval", 1] - args += ["--log-enable=trace+:management"] - # Use store if present. - if BrokerTest.store_lib: args +=["--load-module", BrokerTest.store_lib] - cluster = self.cluster(3, args) - - clients = [] # Per-broker list of clients that only connect to one broker. - mclients = [] # Management clients that connect to every broker in the cluster. - - def start_clients(broker): - """Start ordinary clients for a broker.""" - cmds=[ - ["qpid-tool", "localhost:%s"%(broker.port())], - ["qpid-perftest", "--count=5000", "--durable=yes", - "--base-name", str(qpid.datatypes.uuid4()), "--port", broker.port()], - ["qpid-txtest", "--queue-base-name", "tx-%s"%str(qpid.datatypes.uuid4()), - "--port", broker.port()], - ["qpid-queue-stats", "-a", "localhost:%s" %(broker.port())], - ["testagent", "localhost", str(broker.port())] ] - clients.append([ClientLoop(broker, cmd) for cmd in cmds]) - - def start_mclients(broker): - """Start management clients that make multiple connections.""" - cmd = ["qpid-stat", "-b", "localhost:%s" %(broker.port())] - mclients.append(ClientLoop(broker, cmd)) - - endtime = time.time() + self.duration() - # For long duration, first run is a quarter of the duration. - runtime = max(5, self.duration() / 4.0) - alive = 0 # First live cluster member - for i in range(len(cluster)): start_clients(cluster[i]) - start_mclients(cluster[alive]) - - while time.time() < endtime: - time.sleep(runtime) - runtime = 5 # Remaining runs 5 seconds, frequent broker kills - for b in cluster[alive:]: b.ready() # Check if a broker crashed. - # Kill the first broker, expect the clients to fail. - b = cluster[alive] - b.expect = EXPECT_EXIT_FAIL - b.kill() - # Stop the brokers clients and all the mclients. - for c in clients[alive] + mclients: - try: c.stop() - except: pass # Ignore expected errors due to broker shutdown. - clients[alive] = [] - mclients = [] - # Start another broker and clients - alive += 1 - cluster.start() - start_clients(cluster[-1]) - start_mclients(cluster[alive]) - for c in chain(mclients, *clients): - c.stop() - # Verify that logs are consistent - cluster_test_logs.verify_logs() - - def test_management_qmf2(self): - self.test_management(args=["--mgmt-qmf2=yes"]) - - def test_connect_consistent(self): - args=["--mgmt-pub-interval=1","--log-enable=trace+:management"] - cluster = self.cluster(2, args=args) - end = time.time() + self.duration() - while (time.time() < end): # Get a management interval - for i in xrange(1000): cluster[0].connect().close() - cluster_test_logs.verify_logs() - - def test_flowlimit_failover(self): - """Test fail-over during continuous send-receive with flow control - active. - """ - - # Original cluster will all be killed so expect exit with failure - cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL) - #for b in cluster: ErrorGenerator(b) - - # create a queue with rather draconian flow control settings - ssn0 = cluster[0].connect().session() - s0 = ssn0.sender("test-queue; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.flow_stop_count':2000, 'qpid.flow_resume_count':100}}}}") - - receiver = NumberedReceiver(cluster[2]) - receiver.start() - senders = [NumberedSender(cluster[i]) for i in range(1,3)] - for s in senders: - s.start() - - # Kill original brokers, start new ones for the duration. - endtime = time.time() + self.duration(); - i = 0 - while time.time() < endtime: - cluster[i].kill() - i += 1 - b = cluster.start(expect=EXPECT_EXIT_FAIL) - #ErrorGenerator(b) - time.sleep(5) - #b = cluster[0] - #b.startQmf() - for s in senders: - s.stop() - receiver.stop() - for i in range(i, len(cluster)): cluster[i].kill() - - -class StoreTests(BrokerTest): - """ - Cluster tests that can only be run if there is a store available. - """ - def args(self): - assert BrokerTest.store_lib - return ["--load-module", BrokerTest.store_lib] - - def test_store_loaded(self): - """Ensure we are indeed loading a working store""" - broker = self.broker(self.args(), name="recoverme", expect=EXPECT_EXIT_FAIL) - m = Message("x", durable=True) - broker.send_message("q", m) - broker.kill() - broker = self.broker(self.args(), name="recoverme") - self.assertEqual("x", broker.get_message("q").content) - - def test_kill_restart(self): - """Verify we can kill/resetart a broker with store in a cluster""" - cluster = self.cluster(1, self.args()) - cluster.start("restartme", expect=EXPECT_EXIT_FAIL).kill() - - # Send a message, retrieve from the restarted broker - cluster[0].send_message("q", "x") - m = cluster.start("restartme").get_message("q") - self.assertEqual("x", m.content) - - def stop_cluster(self,broker): - """Clean shut-down of a cluster""" - self.assertEqual(0, qpid_cluster.main( - ["-kf", broker.host_port()])) - - def test_persistent_restart(self): - """Verify persistent cluster shutdown/restart scenarios""" - cluster = self.cluster(0, args=self.args() + ["--cluster-size=3"]) - a = cluster.start("a", expect=EXPECT_EXIT_OK, wait=False) - b = cluster.start("b", expect=EXPECT_EXIT_OK, wait=False) - c = cluster.start("c", expect=EXPECT_EXIT_FAIL, wait=True) - a.send_message("q", Message("1", durable=True)) - # Kill & restart one member. - c.kill() - self.assertEqual(a.get_message("q").content, "1") - a.send_message("q", Message("2", durable=True)) - c = cluster.start("c", expect=EXPECT_EXIT_OK) - self.assertEqual(c.get_message("q").content, "2") - # Shut down the entire cluster cleanly and bring it back up - a.send_message("q", Message("3", durable=True)) - self.stop_cluster(a) - a = cluster.start("a", wait=False) - b = cluster.start("b", wait=False) - c = cluster.start("c", wait=True) - self.assertEqual(a.get_message("q").content, "3") - - def test_persistent_partial_failure(self): - # Kill 2 members, shut down the last cleanly then restart - # Ensure we use the clean database - cluster = self.cluster(0, args=self.args() + ["--cluster-size=3"]) - a = cluster.start("a", expect=EXPECT_EXIT_FAIL, wait=False) - b = cluster.start("b", expect=EXPECT_EXIT_FAIL, wait=False) - c = cluster.start("c", expect=EXPECT_EXIT_OK, wait=True) - a.send_message("q", Message("4", durable=True)) - a.kill() - b.kill() - self.assertEqual(c.get_message("q").content, "4") - c.send_message("q", Message("clean", durable=True)) - self.stop_cluster(c) - a = cluster.start("a", wait=False) - b = cluster.start("b", wait=False) - c = cluster.start("c", wait=True) - self.assertEqual(a.get_message("q").content, "clean") - - def test_wrong_cluster_id(self): - # Start a cluster1 broker, then try to restart in cluster2 - cluster1 = self.cluster(0, args=self.args()) - a = cluster1.start("a", expect=EXPECT_EXIT_OK) - a.terminate() - cluster2 = self.cluster(1, args=self.args()) - try: - a = cluster2.start("a", expect=EXPECT_EXIT_FAIL) - a.ready() - self.fail("Expected exception") - except: pass - - def test_wrong_shutdown_id(self): - # Start 2 members and shut down. - cluster = self.cluster(0, args=self.args()+["--cluster-size=2"]) - a = cluster.start("a", expect=EXPECT_EXIT_OK, wait=False) - b = cluster.start("b", expect=EXPECT_EXIT_OK, wait=False) - self.stop_cluster(a) - self.assertEqual(a.wait(), 0) - self.assertEqual(b.wait(), 0) - - # Restart with a different member and shut down. - a = cluster.start("a", expect=EXPECT_EXIT_OK, wait=False) - c = cluster.start("c", expect=EXPECT_EXIT_OK, wait=False) - self.stop_cluster(a) - self.assertEqual(a.wait(), 0) - self.assertEqual(c.wait(), 0) - # Mix members from both shutdown events, they should fail - # TODO aconway 2010-03-11: can't predict the exit status of these - # as it depends on the order of delivery of initial-status messages. - # See comment at top of this file. - a = cluster.start("a", expect=EXPECT_UNKNOWN, wait=False) - b = cluster.start("b", expect=EXPECT_UNKNOWN, wait=False) - self.assertRaises(Exception, lambda: a.ready()) - self.assertRaises(Exception, lambda: b.ready()) - - def test_solo_store_clean(self): - # A single node cluster should always leave a clean store. - cluster = self.cluster(0, self.args()) - a = cluster.start("a", expect=EXPECT_EXIT_FAIL) - a.send_message("q", Message("x", durable=True)) - a.kill() - a = cluster.start("a") - self.assertEqual(a.get_message("q").content, "x") - - def test_last_store_clean(self): - # Verify that only the last node in a cluster to shut down has - # a clean store. Start with cluster of 3, reduce to 1 then - # increase again to ensure that a node that was once alone but - # finally did not finish as the last node does not get a clean - # store. - cluster = self.cluster(0, self.args()) - a = cluster.start("a", expect=EXPECT_EXIT_FAIL) - self.assertEqual(a.store_state(), "clean") - b = cluster.start("b", expect=EXPECT_EXIT_FAIL) - c = cluster.start("c", expect=EXPECT_EXIT_FAIL) - self.assertEqual(b.store_state(), "dirty") - self.assertEqual(c.store_state(), "dirty") - retry(lambda: a.store_state() == "dirty") - - a.send_message("q", Message("x", durable=True)) - a.kill() - b.kill() # c is last man, will mark store clean - retry(lambda: c.store_state() == "clean") - a = cluster.start("a", expect=EXPECT_EXIT_FAIL) # c no longer last man - retry(lambda: c.store_state() == "dirty") - c.kill() # a is now last man - retry(lambda: a.store_state() == "clean") - a.kill() - self.assertEqual(a.store_state(), "clean") - self.assertEqual(b.store_state(), "dirty") - self.assertEqual(c.store_state(), "dirty") - - def test_restart_clean(self): - """Verify that we can re-start brokers one by one in a - persistent cluster after a clean oshutdown""" - cluster = self.cluster(0, self.args()) - a = cluster.start("a", expect=EXPECT_EXIT_OK) - b = cluster.start("b", expect=EXPECT_EXIT_OK) - c = cluster.start("c", expect=EXPECT_EXIT_OK) - a.send_message("q", Message("x", durable=True)) - self.stop_cluster(a) - a = cluster.start("a") - b = cluster.start("b") - c = cluster.start("c") - self.assertEqual(c.get_message("q").content, "x") - - def test_join_sub_size(self): - """Verify that after starting a cluster with cluster-size=N, - we can join new members even if size < N-1""" - cluster = self.cluster(0, self.args()+["--cluster-size=3"]) - a = cluster.start("a", wait=False, expect=EXPECT_EXIT_FAIL) - b = cluster.start("b", wait=False, expect=EXPECT_EXIT_FAIL) - c = cluster.start("c") - a.send_message("q", Message("x", durable=True)) - a.send_message("q", Message("y", durable=True)) - a.kill() - b.kill() - a = cluster.start("a") - self.assertEqual(c.get_message("q").content, "x") - b = cluster.start("b") - self.assertEqual(c.get_message("q").content, "y") |