summaryrefslogtreecommitdiff
path: root/cpp/src/tests/cluster_tests.py
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/tests/cluster_tests.py')
-rwxr-xr-xcpp/src/tests/cluster_tests.py1834
1 files changed, 0 insertions, 1834 deletions
diff --git a/cpp/src/tests/cluster_tests.py b/cpp/src/tests/cluster_tests.py
deleted file mode 100755
index 3c96b252df..0000000000
--- a/cpp/src/tests/cluster_tests.py
+++ /dev/null
@@ -1,1834 +0,0 @@
-#!/usr/bin/env python
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-import os, signal, sys, time, imp, re, subprocess, glob, random, logging
-import cluster_test_logs
-from qpid import datatypes, messaging
-from brokertest import *
-from qpid.harness import Skipped
-from qpid.messaging import Message, Empty, Disposition, REJECTED, util
-from threading import Thread, Lock, Condition
-from logging import getLogger
-from itertools import chain
-from tempfile import NamedTemporaryFile
-
-log = getLogger("qpid.cluster_tests")
-
-# Note: brokers that shut themselves down due to critical error during
-# normal operation will still have an exit code of 0. Brokers that
-# shut down because of an error found during initialize will exit with
-# a non-0 code. Hence the apparently inconsistent use of EXPECT_EXIT_OK
-# and EXPECT_EXIT_FAIL in some of the tests below.
-
-# TODO aconway 2010-03-11: resolve this - ideally any exit due to an error
-# should give non-0 exit status.
-
-# Import scripts as modules
-qpid_cluster=import_script(checkenv("QPID_CLUSTER_EXEC"))
-
-def readfile(filename):
- """Returns te content of file named filename as a string"""
- f = file(filename)
- try: return f.read()
- finally: f.close()
-
-class ShortTests(BrokerTest):
- """Short cluster functionality tests."""
-
- def test_message_replication(self):
- """Test basic cluster message replication."""
- # Start a cluster, send some messages to member 0.
- cluster = self.cluster(2)
- s0 = cluster[0].connect().session()
- s0.sender("q; {create:always}").send(Message("x"))
- s0.sender("q; {create:always}").send(Message("y"))
- s0.connection.close()
-
- # Verify messages available on member 1.
- s1 = cluster[1].connect().session()
- m = s1.receiver("q", capacity=1).fetch(timeout=1)
- s1.acknowledge()
- self.assertEqual("x", m.content)
- s1.connection.close()
-
- # Start member 2 and verify messages available.
- s2 = cluster.start().connect().session()
- m = s2.receiver("q", capacity=1).fetch(timeout=1)
- s2.acknowledge()
- self.assertEqual("y", m.content)
- s2.connection.close()
-
- def test_store_direct_update_match(self):
- """Verify that brokers stores an identical message whether they receive it
- direct from clients or during an update, no header or other differences"""
- cluster = self.cluster(0, args=["--load-module", self.test_store_lib])
- cluster.start(args=["--test-store-dump", "direct.dump"])
- # Try messages with various headers
- cluster[0].send_message("q", Message(durable=True, content="foobar",
- subject="subject",
- reply_to="reply_to",
- properties={"n":10}))
- # Try messages of different sizes
- for size in range(0,10000,100):
- cluster[0].send_message("q", Message(content="x"*size, durable=True))
- # Try sending via named exchange
- c = cluster[0].connect_old()
- s = c.session(str(qpid.datatypes.uuid4()))
- s.exchange_bind(exchange="amq.direct", binding_key="foo", queue="q")
- props = s.delivery_properties(routing_key="foo", delivery_mode=2)
- s.message_transfer(
- destination="amq.direct",
- message=qpid.datatypes.Message(props, "content"))
-
- # Try message with TTL and differnet headers/properties
- cluster[0].send_message("q", Message(durable=True, ttl=100000))
- cluster[0].send_message("q", Message(durable=True, properties={}, ttl=100000))
- cluster[0].send_message("q", Message(durable=True, properties={"x":10}, ttl=100000))
-
- # Now update a new member and compare their dumps.
- cluster.start(args=["--test-store-dump", "updatee.dump"])
- assert readfile("direct.dump") == readfile("updatee.dump")
-
- os.remove("direct.dump")
- os.remove("updatee.dump")
-
- def test_sasl(self):
- """Test SASL authentication and encryption in a cluster"""
- sasl_config=os.path.join(self.rootdir, "sasl_config")
- acl=os.path.join(os.getcwd(), "policy.acl")
- aclf=file(acl,"w")
- # Must allow cluster-user (zag) access to credentials exchange.
- aclf.write("""
-acl allow zag@QPID publish exchange name=qpid.cluster-credentials
-acl allow zig@QPID all all
-acl deny all all
-""")
- aclf.close()
- cluster = self.cluster(1, args=["--auth", "yes",
- "--sasl-config", sasl_config,
- "--load-module", os.getenv("ACL_LIB"),
- "--acl-file", acl,
- "--cluster-username=zag",
- "--cluster-password=zag",
- "--cluster-mechanism=PLAIN"
- ])
-
- # Valid user/password, ensure queue is created.
- c = cluster[0].connect(username="zig", password="zig")
- c.session().sender("ziggy;{create:always,node:{x-declare:{exclusive:true}}}")
- c.close()
- cluster.start() # Start second node.
-
- # Check queue is created on second node.
- c = cluster[1].connect(username="zig", password="zig")
- c.session().receiver("ziggy;{assert:always}")
- c.close()
- for b in cluster: b.ready() # Make sure all brokers still running.
-
- # Valid user, bad password
- try:
- cluster[0].connect(username="zig", password="foo").close()
- self.fail("Expected exception")
- except messaging.exceptions.ConnectionError: pass
- for b in cluster: b.ready() # Make sure all brokers still running.
-
- # Bad user ID
- try:
- cluster[0].connect(username="foo", password="bar").close()
- self.fail("Expected exception")
- except messaging.exceptions.ConnectionError: pass
- for b in cluster: b.ready() # Make sure all brokers still running.
-
- # Action disallowed by ACL
- c = cluster[0].connect(username="zag", password="zag")
- try:
- s = c.session()
- s.sender("zaggy;{create:always}")
- s.close()
- self.fail("Expected exception")
- except messaging.exceptions.UnauthorizedAccess: pass
- # make sure the queue was not created at the other node.
- c = cluster[1].connect(username="zig", password="zig")
- try:
- s = c.session()
- s.sender("zaggy;{assert:always}")
- s.close()
- self.fail("Expected exception")
- except messaging.exceptions.NotFound: pass
-
- def test_sasl_join_good(self):
- """Verify SASL authentication between brokers when joining a cluster."""
- sasl_config=os.path.join(self.rootdir, "sasl_config")
- # Test with a valid username/password
- cluster = self.cluster(1, args=["--auth", "yes",
- "--sasl-config", sasl_config,
- "--cluster-username=zig",
- "--cluster-password=zig",
- "--cluster-mechanism=PLAIN"
- ])
- cluster.start()
- c = cluster[1].connect(username="zag", password="zag", mechanism="PLAIN")
-
- def test_sasl_join_bad_password(self):
- # Test with an invalid password
- cluster = self.cluster(1, args=["--auth", "yes",
- "--sasl-config", os.path.join(self.rootdir, "sasl_config"),
- "--cluster-username=zig",
- "--cluster-password=bad",
- "--cluster-mechanism=PLAIN"
- ])
- cluster.start(wait=False, expect=EXPECT_EXIT_FAIL)
- assert cluster[1].log_contains("critical Unexpected error: connection-forced: Authentication failed")
-
- def test_sasl_join_wrong_user(self):
- # Test with a valid user that is not the cluster user.
- cluster = self.cluster(0, args=["--auth", "yes",
- "--sasl-config", os.path.join(self.rootdir, "sasl_config")])
- cluster.start(args=["--cluster-username=zig",
- "--cluster-password=zig",
- "--cluster-mechanism=PLAIN"
- ])
-
- cluster.start(wait=False, expect=EXPECT_EXIT_FAIL,
- args=["--cluster-username=zag",
- "--cluster-password=zag",
- "--cluster-mechanism=PLAIN"
- ])
- assert cluster[1].log_contains("critical Unexpected error: unauthorized-access: unauthorized-access: Unauthorized user zag@QPID for qpid.cluster-credentials, should be zig")
-
- def test_user_id_update(self):
- """Ensure that user-id of an open session is updated to new cluster members"""
- sasl_config=os.path.join(self.rootdir, "sasl_config")
- cluster = self.cluster(1, args=["--auth", "yes", "--sasl-config", sasl_config,
- "--cluster-mechanism=ANONYMOUS"])
- c = cluster[0].connect(username="zig", password="zig")
- s = c.session().sender("q;{create:always}")
- s.send(Message("x", user_id="zig")) # Message sent before start new broker
- cluster.start()
- s.send(Message("y", user_id="zig")) # Messsage sent after start of new broker
- # Verify brokers are healthy and messages are on the queue.
- self.assertEqual("x", cluster[0].get_message("q").content)
- self.assertEqual("y", cluster[1].get_message("q").content)
-
- def test_other_mech(self):
- """Test using a mechanism other than PLAIN/ANONYMOUS for cluster update authentication.
- Regression test for https://issues.apache.org/jira/browse/QPID-3849"""
- sasl_config=os.path.join(self.rootdir, "sasl_config")
- cluster = self.cluster(2, args=["--auth", "yes", "--sasl-config", sasl_config,
- "--cluster-username=zig",
- "--cluster-password=zig",
- "--cluster-mechanism=DIGEST-MD5"])
- cluster[0].connect()
- cluster.start() # Before the fix this broker falied to join the cluster.
- cluster[2].connect()
-
- def test_link_events(self):
- """Regression test for https://bugzilla.redhat.com/show_bug.cgi?id=611543"""
- args = ["--mgmt-pub-interval", 1] # Publish management information every second.
- broker1 = self.cluster(1, args)[0]
- broker2 = self.cluster(1, args)[0]
- qp = self.popen(["qpid-printevents", broker1.host_port()], EXPECT_RUNNING)
- qr = self.popen(["qpid-route", "route", "add",
- broker1.host_port(), broker2.host_port(),
- "amq.fanout", "key"
- ], EXPECT_EXIT_OK)
- # Look for link event in printevents output.
- retry(lambda: find_in_file("brokerLinkUp", qp.outfile("out")))
- broker1.ready()
- broker2.ready()
- qr.wait()
-
- def test_queue_cleaner(self):
- """ Regression test to ensure that cleanup of expired messages works correctly """
- cluster = self.cluster(2, args=["--queue-purge-interval", 3])
-
- s0 = cluster[0].connect().session()
- sender = s0.sender("my-lvq; {create: always, node:{x-declare:{arguments:{'qpid.last_value_queue':1}}}}")
- #send 10 messages that will all expire and be cleaned up
- for i in range(1, 10):
- msg = Message("message-%s" % i)
- msg.properties["qpid.LVQ_key"] = "a"
- msg.ttl = 0.1
- sender.send(msg)
- #wait for queue cleaner to run
- time.sleep(3)
-
- #test all is ok by sending and receiving a message
- msg = Message("non-expiring")
- msg.properties["qpid.LVQ_key"] = "b"
- sender.send(msg)
- s0.connection.close()
- s1 = cluster[1].connect().session()
- m = s1.receiver("my-lvq", capacity=1).fetch(timeout=1)
- s1.acknowledge()
- self.assertEqual("non-expiring", m.content)
- s1.connection.close()
-
- for b in cluster: b.ready() # Make sure all brokers still running.
-
-
- def test_amqfailover_visible(self):
- """Verify that the amq.failover exchange can be seen by
- QMF-based tools - regression test for BZ615300."""
- broker1 = self.cluster(1)[0]
- broker2 = self.cluster(1)[0]
- qs = subprocess.Popen(["qpid-stat", "-e", "-b", broker1.host_port()], stdout=subprocess.PIPE)
- out = qs.communicate()[0]
- assert out.find("amq.failover") > 0
-
- def evaluate_address(self, session, address):
- """Create a receiver just to evaluate an address for its side effects"""
- r = session.receiver(address)
- r.close()
-
- def test_expire_fanout(self):
- """Regression test for QPID-2874: Clustered broker crashes in assertion in
- cluster/ExpiryPolicy.cpp.
- Caused by a fan-out message being updated as separate messages"""
- cluster = self.cluster(1)
- session0 = cluster[0].connect().session()
- # Create 2 queues bound to fanout exchange.
- self.evaluate_address(session0, "q1;{create:always,node:{x-bindings:[{exchange:'amq.fanout',queue:q1}]}}")
- self.evaluate_address(session0, "q2;{create:always,node:{x-bindings:[{exchange:'amq.fanout',queue:q2}]}}")
- queues = ["q1", "q2"]
- # Send a fanout message with a long timeout
- s = session0.sender("amq.fanout")
- s.send(Message("foo", ttl=100), sync=False)
- # Start a new member, check the messages
- cluster.start()
- session1 = cluster[1].connect().session()
- for q in queues: self.assert_browse(session1, "q1", ["foo"])
-
- def test_route_update(self):
- """Regression test for https://issues.apache.org/jira/browse/QPID-2982
- Links and bridges associated with routes were not replicated on update.
- This meant extra management objects and caused an exit if a management
- client was attached.
- """
- args=["--mgmt-pub-interval=1","--log-enable=trace+:management"]
-
- # First broker will be killed.
- cluster0 = self.cluster(1, args=args)
- cluster1 = self.cluster(1, args=args)
- assert 0 == subprocess.call(
- ["qpid-route", "route", "add", cluster0[0].host_port(),
- cluster1[0].host_port(), "dummy-exchange", "dummy-key", "-d"])
- cluster0.start()
-
- # Wait for qpid-tool:list on cluster0[0] to generate expected output.
- pattern = re.compile("org.apache.qpid.broker.*link")
- qpid_tool = subprocess.Popen(["qpid-tool", cluster0[0].host_port()],
- stdin=subprocess.PIPE, stdout=subprocess.PIPE)
- class Scanner(Thread):
- def __init__(self): self.found = False; Thread.__init__(self)
- def run(self):
- for l in qpid_tool.stdout:
- if pattern.search(l): self.found = True; return
- scanner = Scanner()
- scanner.start()
- start = time.time()
- try:
- # Wait up to 5 second timeout for scanner to find expected output
- while not scanner.found and time.time() < start + 5:
- qpid_tool.stdin.write("list\n") # Ask qpid-tool to list
- for b in cluster0: b.ready() # Raise if any brokers are down
- finally:
- qpid_tool.stdin.write("quit\n")
- qpid_tool.wait()
- scanner.join()
- assert scanner.found
- # Regression test for https://issues.apache.org/jira/browse/QPID-3235
- # Inconsistent stats when changing elder.
-
- # Force a change of elder
- cluster0.start()
- for b in cluster0: b.ready()
- cluster0[0].expect=EXPECT_EXIT_FAIL # About to die.
- cluster0[0].kill()
- time.sleep(2) # Allow a management interval to pass.
- for b in cluster0[1:]: b.ready()
- # Verify logs are consistent
- cluster_test_logs.verify_logs()
-
- def test_redelivered(self):
- """Verify that redelivered flag is set correctly on replayed messages"""
- cluster = self.cluster(2, expect=EXPECT_EXIT_FAIL)
- url = "amqp:tcp:%s,tcp:%s" % (cluster[0].host_port(), cluster[1].host_port())
- queue = "my-queue"
- cluster[0].declare_queue(queue)
- self.sender = self.popen(
- ["qpid-send",
- "--broker", url,
- "--address", queue,
- "--sequence=true",
- "--send-eos=1",
- "--messages=100000",
- "--connection-options={%s}"%(Cluster.CONNECTION_OPTIONS)
- ])
- self.receiver = self.popen(
- ["qpid-receive",
- "--broker", url,
- "--address", queue,
- "--ignore-duplicates",
- "--check-redelivered",
- "--connection-options={%s}"%(Cluster.CONNECTION_OPTIONS),
- "--forever"
- ])
- time.sleep(1)#give sender enough time to have some messages to replay
- cluster[0].kill()
- self.sender.wait()
- self.receiver.wait()
- cluster[1].kill()
-
- class BlockedSend(Thread):
- """Send a message, send is expected to block.
- Verify that it does block (for a given timeout), then allow
- waiting till it unblocks when it is expected to do so."""
- def __init__(self, sender, msg):
- self.sender, self.msg = sender, msg
- self.blocked = True
- self.condition = Condition()
- self.timeout = 0.1 # Time to wait for expected results.
- Thread.__init__(self)
- def run(self):
- try:
- self.sender.send(self.msg, sync=True)
- self.condition.acquire()
- try:
- self.blocked = False
- self.condition.notify()
- finally: self.condition.release()
- except Exception,e: print "BlockedSend exception: %s"%e
- def start(self):
- Thread.start(self)
- time.sleep(self.timeout)
- assert self.blocked # Expected to block
- def assert_blocked(self): assert self.blocked
- def wait(self): # Now expecting to unblock
- self.condition.acquire()
- try:
- while self.blocked:
- self.condition.wait(self.timeout)
- if self.blocked: raise Exception("Timed out waiting for send to unblock")
- finally: self.condition.release()
- self.join()
-
- def queue_flowlimit_test(self, brokers):
- """Verify that the queue's flowlimit configuration and state are
- correctly replicated.
- The brokers argument allows this test to run on single broker,
- cluster of 2 pre-startd brokers or cluster where second broker
- starts after queue is in flow control.
- """
- # configure a queue with a specific flow limit on first broker
- ssn0 = brokers.first().connect().session()
- s0 = ssn0.sender("flq; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.flow_stop_count':5, 'qpid.flow_resume_count':3}}}}")
- brokers.first().startQmf()
- q1 = [q for q in brokers.first().qmf_session.getObjects(_class="queue") if q.name == "flq"][0]
- oid = q1.getObjectId()
- self.assertEqual(q1.name, "flq")
- self.assertEqual(q1.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L})
- assert not q1.flowStopped
- self.assertEqual(q1.flowStoppedCount, 0)
-
- # fill the queue on one broker until flow control is active
- for x in range(5): s0.send(Message(str(x)))
- sender = ShortTests.BlockedSend(s0, Message(str(6)))
- sender.start() # Tests that sender does block
- # Verify the broker queue goes into a flowStopped state
- deadline = time.time() + 1
- while not q1.flowStopped and time.time() < deadline: q1.update()
- assert q1.flowStopped
- self.assertEqual(q1.flowStoppedCount, 1)
- sender.assert_blocked() # Still blocked
-
- # Now verify the both brokers in cluster have same configuration
- brokers.second().startQmf()
- qs = brokers.second().qmf_session.getObjects(_objectId=oid)
- self.assertEqual(len(qs), 1)
- q2 = qs[0]
- self.assertEqual(q2.name, "flq")
- self.assertEqual(q2.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L})
- assert q2.flowStopped
- self.assertEqual(q2.flowStoppedCount, 1)
-
- # now drain the queue using a session to the other broker
- ssn1 = brokers.second().connect().session()
- r1 = ssn1.receiver("flq", capacity=6)
- for x in range(4):
- r1.fetch(timeout=0)
- ssn1.acknowledge()
- sender.wait() # Verify no longer blocked.
-
- # and re-verify state of queue on both brokers
- q1.update()
- assert not q1.flowStopped
- q2.update()
- assert not q2.flowStopped
-
- ssn0.connection.close()
- ssn1.connection.close()
- cluster_test_logs.verify_logs()
-
- def test_queue_flowlimit(self):
- """Test flow limits on a standalone broker"""
- broker = self.broker()
- class Brokers:
- def first(self): return broker
- def second(self): return broker
- self.queue_flowlimit_test(Brokers())
-
- def test_queue_flowlimit_cluster(self):
- cluster = self.cluster(2)
- class Brokers:
- def first(self): return cluster[0]
- def second(self): return cluster[1]
- self.queue_flowlimit_test(Brokers())
-
- def test_queue_flowlimit_cluster_join(self):
- cluster = self.cluster(1)
- class Brokers:
- def first(self): return cluster[0]
- def second(self):
- if len(cluster) == 1: cluster.start()
- return cluster[1]
- self.queue_flowlimit_test(Brokers())
-
- def test_queue_flowlimit_replicate(self):
- """ Verify that a queue which is in flow control BUT has drained BELOW
- the flow control 'stop' threshold, is correctly replicated when a new
- broker is added to the cluster.
- """
-
- class AsyncSender(Thread):
- """Send a fixed number of msgs from a sender in a separate thread
- so it may block without blocking the test.
- """
- def __init__(self, broker, address, count=1, size=4):
- Thread.__init__(self)
- self.daemon = True
- self.broker = broker
- self.queue = address
- self.count = count
- self.size = size
- self.done = False
-
- def run(self):
- self.sender = subprocess.Popen(["qpid-send",
- "--capacity=1",
- "--content-size=%s" % self.size,
- "--messages=%s" % self.count,
- "--failover-updates",
- "--connection-options={%s}"%(Cluster.CONNECTION_OPTIONS),
- "--address=%s" % self.queue,
- "--broker=%s" % self.broker.host_port()])
- self.sender.wait()
- self.done = True
-
- cluster = self.cluster(2)
- # create a queue with rather draconian flow control settings
- ssn0 = cluster[0].connect().session()
- s0 = ssn0.sender("flq; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.flow_stop_count':100, 'qpid.flow_resume_count':20}}}}")
-
- # fire off the sending thread to broker[0], and wait until the queue
- # hits flow control on broker[1]
- sender = AsyncSender(cluster[0], "flq", count=110);
- sender.start();
-
- cluster[1].startQmf()
- q_obj = [q for q in cluster[1].qmf_session.getObjects(_class="queue") if q.name == "flq"][0]
- deadline = time.time() + 10
- while not q_obj.flowStopped and time.time() < deadline:
- q_obj.update()
- assert q_obj.flowStopped
- assert not sender.done
- assert q_obj.msgDepth < 110
-
- # Now drain enough messages on broker[1] to drop below the flow stop
- # threshold, but not relieve flow control...
- receiver = subprocess.Popen(["qpid-receive",
- "--messages=15",
- "--timeout=1",
- "--print-content=no",
- "--failover-updates",
- "--connection-options={%s}"%(Cluster.CONNECTION_OPTIONS),
- "--ack-frequency=1",
- "--address=flq",
- "--broker=%s" % cluster[1].host_port()])
- receiver.wait()
- q_obj.update()
- assert q_obj.flowStopped
- assert not sender.done
- current_depth = q_obj.msgDepth
-
- # add a new broker to the cluster, and verify that the queue is in flow
- # control on that broker
- cluster.start()
- cluster[2].startQmf()
- q_obj = [q for q in cluster[2].qmf_session.getObjects(_class="queue") if q.name == "flq"][0]
- assert q_obj.flowStopped
- assert q_obj.msgDepth == current_depth
-
- # now drain the queue on broker[2], and verify that the sender becomes
- # unblocked
- receiver = subprocess.Popen(["qpid-receive",
- "--messages=95",
- "--timeout=1",
- "--print-content=no",
- "--failover-updates",
- "--connection-options={%s}"%(Cluster.CONNECTION_OPTIONS),
- "--ack-frequency=1",
- "--address=flq",
- "--broker=%s" % cluster[2].host_port()])
- receiver.wait()
- q_obj.update()
- assert not q_obj.flowStopped
- self.assertEqual(q_obj.msgDepth, 0)
-
- # verify that the sender has become unblocked
- sender.join(timeout=5)
- assert not sender.isAlive()
- assert sender.done
-
- def test_blocked_queue_delete(self):
- """Verify that producers which are blocked on a queue due to flow
- control are unblocked when that queue is deleted.
- """
-
- cluster = self.cluster(2)
- cluster[0].startQmf()
- cluster[1].startQmf()
-
- # configure a queue with a specific flow limit on first broker
- ssn0 = cluster[0].connect().session()
- s0 = ssn0.sender("flq; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.flow_stop_count':5, 'qpid.flow_resume_count':3}}}}")
- q1 = [q for q in cluster[0].qmf_session.getObjects(_class="queue") if q.name == "flq"][0]
- oid = q1.getObjectId()
- self.assertEqual(q1.name, "flq")
- self.assertEqual(q1.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L})
- assert not q1.flowStopped
- self.assertEqual(q1.flowStoppedCount, 0)
-
- # fill the queue on one broker until flow control is active
- for x in range(5): s0.send(Message(str(x)))
- sender = ShortTests.BlockedSend(s0, Message(str(6)))
- sender.start() # Tests that sender does block
- # Verify the broker queue goes into a flowStopped state
- deadline = time.time() + 1
- while not q1.flowStopped and time.time() < deadline: q1.update()
- assert q1.flowStopped
- self.assertEqual(q1.flowStoppedCount, 1)
- sender.assert_blocked() # Still blocked
-
- # Now verify the both brokers in cluster have same configuration
- qs = cluster[1].qmf_session.getObjects(_objectId=oid)
- self.assertEqual(len(qs), 1)
- q2 = qs[0]
- self.assertEqual(q2.name, "flq")
- self.assertEqual(q2.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L})
- assert q2.flowStopped
- self.assertEqual(q2.flowStoppedCount, 1)
-
- # now delete the blocked queue from other broker
- ssn1 = cluster[1].connect().session()
- self.evaluate_address(ssn1, "flq;{delete:always}")
- sender.wait() # Verify no longer blocked.
-
- ssn0.connection.close()
- ssn1.connection.close()
- cluster_test_logs.verify_logs()
-
-
- def test_alternate_exchange_update(self):
- """Verify that alternate-exchange on exchanges and queues is propagated to new members of a cluster. """
- cluster = self.cluster(1)
- s0 = cluster[0].connect().session()
- # create alt queue bound to amq.fanout exchange, will be destination for alternate exchanges
- self.evaluate_address(s0, "alt;{create:always,node:{x-bindings:[{exchange:'amq.fanout',queue:alt}]}}")
- # create direct exchange ex with alternate-exchange amq.fanout and no queues bound
- self.evaluate_address(s0, "ex;{create:always,node:{type:topic, x-declare:{type:'direct', alternate-exchange:'amq.fanout'}}}")
- # create queue q with alternate-exchange amq.fanout
- self.evaluate_address(s0, "q;{create:always,node:{type:queue, x-declare:{alternate-exchange:'amq.fanout'}}}")
-
- def verify(broker):
- s = broker.connect().session()
- # Verify unmatched message goes to ex's alternate.
- s.sender("ex").send("foo")
- self.assertEqual("foo", s.receiver("alt").fetch(timeout=0).content)
- # Verify rejected message goes to q's alternate.
- s.sender("q").send("bar")
- msg = s.receiver("q").fetch(timeout=0)
- self.assertEqual("bar", msg.content)
- s.acknowledge(msg, Disposition(REJECTED)) # Reject the message
- self.assertEqual("bar", s.receiver("alt").fetch(timeout=0).content)
-
- verify(cluster[0])
- cluster.start()
- verify(cluster[1])
-
- def test_binding_order(self):
- """Regression test for binding order inconsistency in cluster"""
- cluster = self.cluster(1)
- c0 = cluster[0].connect()
- s0 = c0.session()
- # Declare multiple queues bound to same key on amq.topic
- def declare(q,max=0):
- if max: declare = 'x-declare:{arguments:{"qpid.max_count":%d, "qpid.flow_stop_count":0}}'%max
- else: declare = 'x-declare:{}'
- bind='x-bindings:[{queue:%s,key:key,exchange:"amq.topic"}]'%(q)
- s0.sender("%s;{create:always,node:{%s,%s}}" % (q,declare,bind))
- declare('d',max=4) # Only one with a limit
- for q in ['c', 'b','a']: declare(q)
- # Add a cluster member, send enough messages to exceed the max count
- cluster.start()
- try:
- s = s0.sender('amq.topic/key')
- for m in xrange(1,6): s.send(Message(str(m)))
- self.fail("Expected capacity exceeded exception")
- except messaging.exceptions.TargetCapacityExceeded: pass
- c1 = cluster[1].connect()
- s1 = c1.session()
- s0 = c0.session() # Old session s0 is broken by exception.
- # Verify queue contents are consistent.
- for q in ['a','b','c','d']:
- self.assertEqual(self.browse(s0, q), self.browse(s1, q))
- # Verify queue contents are "best effort"
- for q in ['a','b','c']: self.assert_browse(s1,q,[str(n) for n in xrange(1,6)])
- self.assert_browse(s1,'d',[str(n) for n in xrange(1,5)])
-
- def test_deleted_exchange(self):
- """QPID-3215: cached exchange reference can cause cluster inconsistencies
- if exchange is deleted/recreated
- Verify stand-alone case
- """
- cluster = self.cluster()
- # Verify we do not route message via an exchange that has been destroyed.
- cluster.start()
- s0 = cluster[0].connect().session()
- self.evaluate_address(s0, "ex;{create:always,node:{type:topic}}")
- self.evaluate_address(s0, "q;{create:always,node:{x-bindings:[{exchange:'ex',queue:q,key:foo}]}}")
- send0 = s0.sender("ex/foo")
- send0.send("foo")
- self.assert_browse(s0, "q", ["foo"])
- self.evaluate_address(s0, "ex;{delete:always}")
- try:
- send0.send("bar") # Should fail, exchange is deleted.
- self.fail("Expected not-found exception")
- except qpid.messaging.NotFound: pass
- self.assert_browse(cluster[0].connect().session(), "q", ["foo"])
-
- def test_deleted_exchange_inconsistent(self):
- """QPID-3215: cached exchange reference can cause cluster inconsistencies
- if exchange is deleted/recreated
-
- Verify cluster inconsistency.
- """
- cluster = self.cluster()
- cluster.start()
- s0 = cluster[0].connect().session()
- self.evaluate_address(s0, "ex;{create:always,node:{type:topic}}")
- self.evaluate_address(s0, "q;{create:always,node:{x-bindings:[{exchange:'ex',queue:q,key:foo}]}}")
- send0 = s0.sender("ex/foo")
- send0.send("foo")
- self.assert_browse(s0, "q", ["foo"])
-
- cluster.start()
- s1 = cluster[1].connect().session()
- self.evaluate_address(s0, "ex;{delete:always}")
- try:
- send0.send("bar")
- self.fail("Expected not-found exception")
- except qpid.messaging.NotFound: pass
-
- self.assert_browse(s1, "q", ["foo"])
-
-
- def test_ttl_consistent(self):
- """Ensure we don't get inconsistent errors with message that have TTL very close together"""
- messages = [ Message(str(i), ttl=i/1000.0) for i in xrange(0,1000)]
- messages.append(Message("x"))
- cluster = self.cluster(2)
- sender = cluster[0].connect().session().sender("q;{create:always}")
-
- def fetch(b):
- receiver = b.connect().session().receiver("q;{create:always}")
- while receiver.fetch().content != "x": pass
-
- for m in messages: sender.send(m, sync=False)
- for m in messages: sender.send(m, sync=False)
- fetch(cluster[0])
- fetch(cluster[1])
- for m in messages: sender.send(m, sync=False)
- cluster.start()
- fetch(cluster[2])
-
-
- def _verify_federation(self, src_broker, src, dst_broker, dst, timeout=30):
- """ Prove that traffic can pass between two federated brokers.
- """
- tot_time = 0
- active = False
- send_session = src_broker.connect().session()
- sender = send_session.sender(src)
- receive_session = dst_broker.connect().session()
- receiver = receive_session.receiver(dst)
- while not active and tot_time < timeout:
- sender.send(Message("Hello from Source!"))
- try:
- receiver.fetch(timeout = 1)
- receive_session.acknowledge()
- # Get this far without Empty exception, and the link is good!
- active = True
- while True:
- # Keep receiving msgs, as several may have accumulated
- receiver.fetch(timeout = 1)
- receive_session.acknowledge()
- except Empty:
- if not active:
- tot_time += 1
- receiver.close()
- receive_session.close()
- sender.close()
- send_session.close()
- return active
-
- def test_federation_failover(self):
- """
- Verify that federation operates across failures occuring in a cluster.
- Specifically:
- 1) Destination cluster learns of membership changes in the source
- cluster
- 2) Destination cluster replicates the current state of the source
- cluster to newly-added members
- """
-
- # 2 node cluster source, 2 node cluster destination
- src_cluster = self.cluster(2, expect=EXPECT_EXIT_FAIL)
- src_cluster.ready();
- dst_cluster = self.cluster(2, expect=EXPECT_EXIT_FAIL)
- dst_cluster.ready();
-
- cmd = self.popen(["qpid-config",
- "--broker", src_cluster[0].host_port(),
- "add", "queue", "srcQ"], EXPECT_EXIT_OK)
- cmd.wait()
-
- cmd = self.popen(["qpid-config",
- "--broker", dst_cluster[0].host_port(),
- "add", "exchange", "fanout", "destX"], EXPECT_EXIT_OK)
- cmd.wait()
-
- cmd = self.popen(["qpid-config",
- "--broker", dst_cluster[0].host_port(),
- "add", "queue", "destQ"], EXPECT_EXIT_OK)
- cmd.wait()
-
- cmd = self.popen(["qpid-config",
- "--broker", dst_cluster[0].host_port(),
- "bind", "destX", "destQ"], EXPECT_EXIT_OK)
- cmd.wait()
-
- # federate the srcQ to the destination exchange
- dst_cluster[0].startQmf()
- dst_broker = dst_cluster[0].qmf_session.getObjects(_class="broker")[0]
- result = dst_broker.connect(src_cluster[0].host(), src_cluster[0].port(), False, "PLAIN",
- "guest", "guest", "tcp")
- self.assertEqual(result.status, 0, result);
-
- link = dst_cluster[0].qmf_session.getObjects(_class="link")[0]
- result = link.bridge(False, "srcQ", "destX", "", "", "", True, False, False, 10)
- self.assertEqual(result.status, 0, result)
-
- # check that traffic passes
- assert self._verify_federation(src_cluster[0], "srcQ", dst_cluster[0], "destQ")
-
- # add src[2] broker to source cluster
- src_cluster.start(expect=EXPECT_EXIT_FAIL);
- src_cluster.ready();
- assert self._verify_federation(src_cluster[2], "srcQ", dst_cluster[0], "destQ")
-
- # Kill src[0]. dst[0] should fail over to src[1]
- src_cluster[0].kill()
- for b in src_cluster[1:]: b.ready()
- assert self._verify_federation(src_cluster[1], "srcQ", dst_cluster[0], "destQ")
-
- # Kill src[1], dst[0] should fail over to src[2]
- src_cluster[1].kill()
- for b in src_cluster[2:]: b.ready()
- assert self._verify_federation(src_cluster[2], "srcQ", dst_cluster[0], "destQ")
-
- # Kill dest[0], force failover to dest[1]
- dst_cluster[0].kill()
- for b in dst_cluster[1:]: b.ready()
- assert self._verify_federation(src_cluster[2], "srcQ", dst_cluster[1], "destQ")
-
- # Add dest[2]
- # dest[1] syncs dest[2] to current remote state
- dst_cluster.start(expect=EXPECT_EXIT_FAIL);
- for b in dst_cluster[1:]: b.ready()
- assert self._verify_federation(src_cluster[2], "srcQ", dst_cluster[1], "destQ")
-
- # Kill dest[1], force failover to dest[2]
- dst_cluster[1].kill()
- for b in dst_cluster[2:]: b.ready()
- assert self._verify_federation(src_cluster[2], "srcQ", dst_cluster[2], "destQ")
-
- for i in range(2, len(src_cluster)): src_cluster[i].kill()
- for i in range(2, len(dst_cluster)): dst_cluster[i].kill()
-
-
- def test_federation_multilink_failover(self):
- """
- Verify that multi-link federation operates across failures occuring in
- a cluster.
- """
-
- # 1 node cluster source, 1 node cluster destination
- src_cluster = self.cluster(1, expect=EXPECT_EXIT_FAIL)
- src_cluster.ready();
- dst_cluster = self.cluster(1, expect=EXPECT_EXIT_FAIL)
- dst_cluster.ready();
-
- # federate a direct binding across two separate links
-
- # first, create a direct exchange bound to two queues using different
- # bindings
- cmd = self.popen(["qpid-config",
- "--broker", src_cluster[0].host_port(),
- "add", "exchange", "direct", "FedX"],
- EXPECT_EXIT_OK)
- cmd.wait()
-
- cmd = self.popen(["qpid-config",
- "--broker", dst_cluster[0].host_port(),
- "add", "exchange", "direct", "FedX"],
- EXPECT_EXIT_OK)
- cmd.wait()
-
- cmd = self.popen(["qpid-config",
- "--broker", dst_cluster[0].host_port(),
- "add", "queue", "destQ1"],
- EXPECT_EXIT_OK)
- cmd.wait()
-
- cmd = self.popen(["qpid-config",
- "--broker", dst_cluster[0].host_port(),
- "bind", "FedX", "destQ1", "one"],
- EXPECT_EXIT_OK)
- cmd.wait()
-
- cmd = self.popen(["qpid-config",
- "--broker", dst_cluster[0].host_port(),
- "add", "queue", "destQ2"],
- EXPECT_EXIT_OK)
- cmd.wait()
-
- cmd = self.popen(["qpid-config",
- "--broker", dst_cluster[0].host_port(),
- "bind", "FedX", "destQ2", "two"],
- EXPECT_EXIT_OK)
- cmd.wait()
-
- # Create two separate links between the dst and source brokers, bind
- # each to different keys
- dst_cluster[0].startQmf()
- dst_broker = dst_cluster[0].qmf_session.getObjects(_class="broker")[0]
-
- for _l in [("link1", "bridge1", "one"),
- ("link2", "bridge2", "two")]:
- result = dst_broker.create("link", _l[0],
- {"host":src_cluster[0].host(),
- "port":src_cluster[0].port()},
- False)
- self.assertEqual(result.status, 0, result);
- result = dst_broker.create("bridge", _l[1],
- {"link":_l[0],
- "src":"FedX",
- "dest":"FedX",
- "key":_l[2]}, False)
- self.assertEqual(result.status, 0);
-
- # check that traffic passes
- assert self._verify_federation(src_cluster[0], "FedX/one", dst_cluster[0], "destQ1")
- assert self._verify_federation(src_cluster[0], "FedX/two", dst_cluster[0], "destQ2")
-
- # add new member, verify traffic
- src_cluster.start(expect=EXPECT_EXIT_FAIL);
- src_cluster.ready();
-
- dst_cluster.start(expect=EXPECT_EXIT_FAIL);
- dst_cluster.ready();
-
- assert self._verify_federation(src_cluster[0], "FedX/one", dst_cluster[0], "destQ1")
- assert self._verify_federation(src_cluster[0], "FedX/two", dst_cluster[0], "destQ2")
-
- src_cluster[0].kill()
- for b in src_cluster[1:]: b.ready()
-
- assert self._verify_federation(src_cluster[1], "FedX/one", dst_cluster[0], "destQ1")
- assert self._verify_federation(src_cluster[1], "FedX/two", dst_cluster[0], "destQ2")
-
- dst_cluster[0].kill()
- for b in dst_cluster[1:]: b.ready()
-
- assert self._verify_federation(src_cluster[1], "FedX/one", dst_cluster[1], "destQ1")
- assert self._verify_federation(src_cluster[1], "FedX/two", dst_cluster[1], "destQ2")
-
- for i in range(1, len(src_cluster)): src_cluster[i].kill()
- for i in range(1, len(dst_cluster)): dst_cluster[i].kill()
-
-
-
-# Some utility code for transaction tests
-XA_RBROLLBACK = 1
-XA_RBTIMEOUT = 2
-XA_OK = 0
-dtx_branch_counter = 0
-
-class DtxStatusException(Exception):
- def __init__(self, expect, actual):
- self.expect = expect
- self.actual = actual
-
- def str(self):
- return "DtxStatusException(expect=%s, actual=%s)"%(self.expect, self.actual)
-
-class DtxTestFixture:
- """Bundle together some common requirements for dtx tests."""
- def __init__(self, test, broker, name, exclusive=False):
- self.test = test
- self.broker = broker
- self.name = name
- # Use old API. DTX is not supported in messaging API.
- self.connection = broker.connect_old()
- self.session = self.connection.session(name, 1) # 1 second timeout
- self.queue = self.session.queue_declare(name, exclusive=exclusive)
- self.session.dtx_select()
- self.consumer = None
-
- def xid(self, id=None):
- if id is None: id = self.name
- return self.session.xid(format=0, global_id=id)
-
- def check_status(self, expect, actual):
- if expect != actual: raise DtxStatusException(expect, actual)
-
- def start(self, id=None, resume=False):
- self.check_status(XA_OK, self.session.dtx_start(xid=self.xid(id), resume=resume).status)
-
- def end(self, id=None, suspend=False):
- self.check_status(XA_OK, self.session.dtx_end(xid=self.xid(id), suspend=suspend).status)
-
- def prepare(self, id=None):
- self.check_status(XA_OK, self.session.dtx_prepare(xid=self.xid(id)).status)
-
- def commit(self, id=None, one_phase=True):
- self.check_status(
- XA_OK, self.session.dtx_commit(xid=self.xid(id), one_phase=one_phase).status)
-
- def rollback(self, id=None):
- self.check_status(XA_OK, self.session.dtx_rollback(xid=self.xid(id)).status)
-
- def set_timeout(self, timeout, id=None):
- self.session.dtx_set_timeout(xid=self.xid(id),timeout=timeout)
-
- def send(self, messages):
- for m in messages:
- dp=self.session.delivery_properties(routing_key=self.name)
- mp=self.session.message_properties()
- self.session.message_transfer(message=qpid.datatypes.Message(dp, mp, m))
-
- def accept(self):
- """Accept 1 message from queue"""
- consumer_tag="%s-consumer"%(self.name)
- self.session.message_subscribe(queue=self.name, destination=consumer_tag)
- self.session.message_flow(unit = self.session.credit_unit.message, value = 1, destination = consumer_tag)
- self.session.message_flow(unit = self.session.credit_unit.byte, value = 0xFFFFFFFFL, destination = consumer_tag)
- msg = self.session.incoming(consumer_tag).get(timeout=1)
- self.session.message_cancel(destination=consumer_tag)
- self.session.message_accept(qpid.datatypes.RangedSet(msg.id))
- return msg
-
-
- def verify(self, sessions, messages):
- for s in sessions:
- self.test.assert_browse(s, self.name, messages)
-
-class DtxTests(BrokerTest):
-
- def test_dtx_update(self):
- """Verify that DTX transaction state is updated to a new broker.
- Start a collection of transactions, then add a new cluster member,
- then verify they commit/rollback correctly on the new broker."""
-
- # Note: multiple test have been bundled into one to avoid the need to start/stop
- # multiple brokers per test.
-
- cluster=self.cluster(1)
- sessions = [cluster[0].connect().session()] # For verify
-
- # Transaction that will be open when new member joins, then committed.
- t1 = DtxTestFixture(self, cluster[0], "t1")
- t1.start()
- t1.send(["1", "2"])
- t1.verify(sessions, []) # Not visible outside of transaction
-
- # Transaction that will be open when new member joins, then rolled back.
- t2 = DtxTestFixture(self, cluster[0], "t2")
- t2.start()
- t2.send(["1", "2"])
-
- # Transaction that will be prepared when new member joins, then committed.
- t3 = DtxTestFixture(self, cluster[0], "t3")
- t3.start()
- t3.send(["1", "2"])
- t3.end()
- t3.prepare()
- t1.verify(sessions, []) # Not visible outside of transaction
-
- # Transaction that will be prepared when new member joins, then rolled back.
- t4 = DtxTestFixture(self, cluster[0], "t4")
- t4.start()
- t4.send(["1", "2"])
- t4.end()
- t4.prepare()
-
- # Transaction using an exclusive queue
- t5 = DtxTestFixture(self, cluster[0], "t5", exclusive=True)
- t5.start()
- t5.send(["1", "2"])
-
- # Accept messages in a transaction before/after join then commit
- # Note: Message sent outside transaction, we're testing transactional acceptance.
- t6 = DtxTestFixture(self, cluster[0], "t6")
- t6.send(["a","b","c"])
- t6.start()
- self.assertEqual(t6.accept().body, "a");
- t6.verify(sessions, ["b", "c"])
-
- # Accept messages in a transaction before/after join then roll back
- # Note: Message sent outside transaction, we're testing transactional acceptance.
- t7 = DtxTestFixture(self, cluster[0], "t7")
- t7.send(["a","b","c"])
- t7.start()
- self.assertEqual(t7.accept().body, "a");
- t7.verify(sessions, ["b", "c"])
-
- # Ended, suspended transactions across join.
- t8 = DtxTestFixture(self, cluster[0], "t8")
- t8.start(id="1")
- t8.send(["x"])
- t8.end(id="1", suspend=True)
- t8.start(id="2")
- t8.send(["y"])
- t8.end(id="2")
- t8.start()
- t8.send("z")
-
-
- # Start new cluster member
- cluster.start()
- sessions.append(cluster[1].connect().session())
-
- # Commit t1
- t1.send(["3","4"])
- t1.verify(sessions, [])
- t1.end()
- t1.commit(one_phase=True)
- t1.verify(sessions, ["1","2","3","4"])
-
- # Rollback t2
- t2.send(["3","4"])
- t2.end()
- t2.rollback()
- t2.verify(sessions, [])
-
- # Commit t3
- t3.commit(one_phase=False)
- t3.verify(sessions, ["1","2"])
-
- # Rollback t4
- t4.rollback()
- t4.verify(sessions, [])
-
- # Commit t5
- t5.send(["3","4"])
- t5.verify(sessions, [])
- t5.end()
- t5.commit(one_phase=True)
- t5.verify(sessions, ["1","2","3","4"])
-
- # Commit t6
- self.assertEqual(t6.accept().body, "b");
- t6.verify(sessions, ["c"])
- t6.end()
- t6.commit(one_phase=True)
- t6.session.close() # Make sure they're not requeued by the session.
- t6.verify(sessions, ["c"])
-
- # Rollback t7
- self.assertEqual(t7.accept().body, "b");
- t7.verify(sessions, ["c"])
- t7.end()
- t7.rollback()
- t7.verify(sessions, ["a", "b", "c"])
-
- # Resume t8
- t8.end()
- t8.commit(one_phase=True)
- t8.start("1", resume=True)
- t8.end("1")
- t8.commit("1", one_phase=True)
- t8.commit("2", one_phase=True)
- t8.verify(sessions, ["z", "x","y"])
-
-
- def test_dtx_failover_rollback(self):
- """Kill a broker during a transaction, verify we roll back correctly"""
- cluster=self.cluster(1, expect=EXPECT_EXIT_FAIL)
- cluster.start(expect=EXPECT_RUNNING)
-
- # Test unprepared at crash
- t1 = DtxTestFixture(self, cluster[0], "t1")
- t1.send(["a"]) # Not in transaction
- t1.start()
- t1.send(["b"]) # In transaction
-
- # Test prepared at crash
- t2 = DtxTestFixture(self, cluster[0], "t2")
- t2.send(["a"]) # Not in transaction
- t2.start()
- t2.send(["b"]) # In transaction
- t2.end()
- t2.prepare()
-
- # Crash the broker
- cluster[0].kill()
-
- # Transactional changes should not appear
- s = cluster[1].connect().session();
- self.assert_browse(s, "t1", ["a"])
- self.assert_browse(s, "t2", ["a"])
-
- def test_dtx_timeout(self):
- """Verify that dtx timeout works"""
- cluster = self.cluster(1)
- t1 = DtxTestFixture(self, cluster[0], "t1")
- t1.start()
- t1.set_timeout(1)
- time.sleep(1.1)
- try:
- t1.end()
- self.fail("Expected rollback timeout.")
- except DtxStatusException, e:
- self.assertEqual(e.actual, XA_RBTIMEOUT)
-
-class TxTests(BrokerTest):
-
- def test_tx_update(self):
- """Verify that transaction state is updated to a new broker"""
-
- def make_message(session, body=None, key=None, id=None):
- dp=session.delivery_properties(routing_key=key)
- mp=session.message_properties(correlation_id=id)
- return qpid.datatypes.Message(dp, mp, body)
-
- cluster=self.cluster(1)
- # Use old API. TX is not supported in messaging API.
- c = cluster[0].connect_old()
- s = c.session("tx-session", 1)
- s.queue_declare(queue="q")
- # Start transaction
- s.tx_select()
- s.message_transfer(message=make_message(s, "1", "q"))
- # Start new member mid-transaction
- cluster.start()
- # Do more work
- s.message_transfer(message=make_message(s, "2", "q"))
- # Commit the transaction and verify the results.
- s.tx_commit()
- for b in cluster: self.assert_browse(b.connect().session(), "q", ["1","2"])
-
-
-class LongTests(BrokerTest):
- """Tests that can run for a long time if -DDURATION=<minutes> is set"""
- def duration(self):
- d = self.config.defines.get("DURATION")
- if d: return float(d)*60
- else: return 3 # Default is to be quick
-
- def test_failover(self):
- """Test fail-over during continuous send-receive with errors"""
-
- # Original cluster will all be killed so expect exit with failure
- cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL)
- for b in cluster: b.ready() # Wait for brokers to be ready
- for b in cluster: ErrorGenerator(b)
-
- # Start sender and receiver threads
- cluster[0].declare_queue("test-queue")
- sender = NumberedSender(cluster[0], max_depth=1000)
- receiver = NumberedReceiver(cluster[0], sender=sender)
- receiver.start()
- sender.start()
- # Wait for sender & receiver to get up and running
- retry(lambda: receiver.received > 0)
-
- # Kill original brokers, start new ones for the duration.
- endtime = time.time() + self.duration()
- i = 0
- while time.time() < endtime:
- sender.sender.assert_running()
- receiver.receiver.assert_running()
- cluster[i].kill()
- i += 1
- b = cluster.start(expect=EXPECT_EXIT_FAIL)
- for b in cluster[i:]: b.ready()
- ErrorGenerator(b)
- time.sleep(5)
- sender.stop()
- receiver.stop()
- for i in range(i, len(cluster)): cluster[i].kill()
-
- def test_management(self, args=[]):
- """
- Stress test: Run management clients and other clients concurrently
- while killing and restarting brokers.
- """
-
- class ClientLoop(StoppableThread):
- """Run a client executable in a loop."""
- def __init__(self, broker, cmd):
- StoppableThread.__init__(self)
- self.broker=broker
- self.cmd = cmd # Client command.
- self.lock = Lock()
- self.process = None # Client process.
- self.start()
-
- def run(self):
- try:
- while True:
- self.lock.acquire()
- try:
- if self.stopped: break
- self.process = self.broker.test.popen(
- self.cmd, expect=EXPECT_UNKNOWN)
- finally:
- self.lock.release()
- try:
- exit = self.process.wait()
- except OSError, e:
- # Process may already have been killed by self.stop()
- break
- except Exception, e:
- self.process.unexpected(
- "client of %s: %s"%(self.broker.name, e))
- self.lock.acquire()
- try:
- if self.stopped: break
- if exit != 0:
- self.process.unexpected(
- "client of %s exit code %s"%(self.broker.name, exit))
- finally:
- self.lock.release()
- except Exception, e:
- self.error = RethrownException("Error in ClientLoop.run")
-
- def stop(self):
- """Stop the running client and wait for it to exit"""
- self.lock.acquire()
- try:
- if self.stopped: return
- self.stopped = True
- if self.process:
- try: self.process.kill() # Kill the client.
- except OSError: pass # The client might not be running.
- finally: self.lock.release()
- StoppableThread.stop(self)
-
- # body of test_management()
-
- args += ["--mgmt-pub-interval", 1]
- args += ["--log-enable=trace+:management"]
- # Use store if present.
- if BrokerTest.store_lib: args +=["--load-module", BrokerTest.store_lib]
- cluster = self.cluster(3, args, expect=EXPECT_EXIT_FAIL) # brokers will be killed
-
- clients = [] # Per-broker list of clients that only connect to one broker.
- mclients = [] # Management clients that connect to every broker in the cluster.
-
- def start_clients(broker):
- """Start ordinary clients for a broker."""
- cmds=[
- ["qpid-tool", "localhost:%s"%(broker.port())],
- ["qpid-perftest", "--count=5000", "--durable=yes",
- "--base-name", str(qpid.datatypes.uuid4()), "--port", broker.port()],
- ["qpid-txtest", "--queue-base-name", "tx-%s"%str(qpid.datatypes.uuid4()),
- "--port", broker.port()],
- ["qpid-queue-stats", "-a", "localhost:%s" %(broker.port())]
- ]
- clients.append([ClientLoop(broker, cmd) for cmd in cmds])
-
- def start_mclients(broker):
- """Start management clients that make multiple connections."""
- cmd = ["qpid-cluster", "-C", "localhost:%s" %(broker.port())]
- mclients.append(ClientLoop(broker, cmd))
-
- endtime = time.time() + self.duration()
- # For long duration, first run is a quarter of the duration.
- runtime = min(5.0, self.duration() / 3.0)
- alive = 0 # First live cluster member
- for i in range(len(cluster)): start_clients(cluster[i])
- start_mclients(cluster[alive])
-
- while time.time() < endtime:
- time.sleep(runtime)
- runtime = 5 # Remaining runs 5 seconds, frequent broker kills
- for b in cluster[alive:]: b.ready() # Check if a broker crashed.
- # Kill the first broker, expect the clients to fail.
- b = cluster[alive]
- b.ready()
- b.kill()
- # Stop the brokers clients and all the mclients.
- for c in clients[alive] + mclients:
- try: c.stop()
- except: pass # Ignore expected errors due to broker shutdown.
- clients[alive] = []
- mclients = []
- # Start another broker and clients
- alive += 1
- cluster.start(expect=EXPECT_EXIT_FAIL)
- cluster[-1].ready() # Wait till its ready
- start_clients(cluster[-1])
- start_mclients(cluster[alive])
- for c in chain(mclients, *clients):
- c.stop()
- for b in cluster[alive:]:
- b.ready() # Verify still alive
- b.kill()
- # Verify that logs are consistent
- cluster_test_logs.verify_logs()
-
- def test_management_qmf2(self):
- self.test_management(args=["--mgmt-qmf2=yes"])
-
- def test_connect_consistent(self):
- args=["--mgmt-pub-interval=1","--log-enable=trace+:management"]
- cluster = self.cluster(2, args=args)
- end = time.time() + self.duration()
- while (time.time() < end): # Get a management interval
- for i in xrange(1000): cluster[0].connect().close()
- cluster_test_logs.verify_logs()
-
- def test_flowlimit_failover(self):
- """Test fail-over during continuous send-receive with flow control
- active.
- """
-
- # Original cluster will all be killed so expect exit with failure
- cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL)
- for b in cluster: b.ready() # Wait for brokers to be ready
-
- # create a queue with rather draconian flow control settings
- ssn0 = cluster[0].connect().session()
- s0 = ssn0.sender("test-queue; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.flow_stop_count':2000, 'qpid.flow_resume_count':100}}}}")
-
- receiver = NumberedReceiver(cluster[0])
- receiver.start()
- sender = NumberedSender(cluster[0])
- sender.start()
- # Wait for senders & receiver to get up and running
- retry(lambda: receiver.received > 10)
-
- # Kill original brokers, start new ones for the duration.
- endtime = time.time() + self.duration();
- i = 0
- while time.time() < endtime:
- sender.sender.assert_running()
- receiver.receiver.assert_running()
- for b in cluster[i:]: b.ready() # Check if any broker crashed.
- cluster[i].kill()
- i += 1
- b = cluster.start(expect=EXPECT_EXIT_FAIL)
- time.sleep(5)
- sender.stop()
- receiver.stop()
- for i in range(i, len(cluster)): cluster[i].kill()
-
- def test_ttl_failover(self):
- """Test that messages with TTL don't cause problems in a cluster with failover"""
-
- class Client(StoppableThread):
-
- def __init__(self, broker):
- StoppableThread.__init__(self)
- self.connection = broker.connect(reconnect=True)
- self.auto_fetch_reconnect_urls(self.connection)
- self.session = self.connection.session()
-
- def auto_fetch_reconnect_urls(self, conn):
- """Replacment for qpid.messaging.util version which is noisy"""
- ssn = conn.session("auto-fetch-reconnect-urls")
- rcv = ssn.receiver("amq.failover")
- rcv.capacity = 10
-
- def main():
- while True:
- try:
- msg = rcv.fetch()
- qpid.messaging.util.set_reconnect_urls(conn, msg)
- ssn.acknowledge(msg, sync=False)
- except messaging.exceptions.LinkClosed: return
- except messaging.exceptions.ConnectionError: return
-
- thread = Thread(name="auto-fetch-reconnect-urls", target=main)
- thread.setDaemon(True)
- thread.start()
-
- def stop(self):
- StoppableThread.stop(self)
- self.connection.detach()
-
- class Sender(Client):
- def __init__(self, broker, address):
- Client.__init__(self, broker)
- self.sent = 0 # Number of messages _reliably_ sent.
- self.sender = self.session.sender(address, capacity=1000)
-
- def send_counted(self, ttl):
- self.sender.send(Message(str(self.sent), ttl=ttl))
- self.sent += 1
-
- def run(self):
- while not self.stopped:
- choice = random.randint(0,4)
- if choice == 0: self.send_counted(None) # No ttl
- elif choice == 1: self.send_counted(100000) # Large ttl
- else: # Small ttl, might expire
- self.sender.send(Message("", ttl=random.random()/10))
- self.sender.send(Message("z"), sync=True) # Chaser.
-
- class Receiver(Client):
-
- def __init__(self, broker, address):
- Client.__init__(self, broker)
- self.received = 0 # Number of non-empty (reliable) messages received.
- self.receiver = self.session.receiver(address, capacity=1000)
- def run(self):
- try:
- while True:
- m = self.receiver.fetch(1)
- if m.content == "z": break
- if m.content: # Ignore unreliable messages
- # Ignore duplicates
- if int(m.content) == self.received: self.received += 1
- except Exception,e: self.error = e
-
- # def test_ttl_failover
-
- # Original cluster will all be killed so expect exit with failure
- # Set small purge interval.
- cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL, args=["--queue-purge-interval=1"])
- for b in cluster: b.ready() # Wait for brokers to be ready
-
- # Python client failover produces noisy WARN logs, disable temporarily
- logger = logging.getLogger()
- log_level = logger.getEffectiveLevel()
- logger.setLevel(logging.ERROR)
- sender = None
- receiver = None
- try:
- # Start sender and receiver threads
- receiver = Receiver(cluster[0], "q;{create:always}")
- receiver.start()
- sender = Sender(cluster[0], "q;{create:always}")
- sender.start()
- # Wait for sender & receiver to get up and running
- retry(lambda: receiver.received > 0)
-
- # Kill brokers in a cycle.
- endtime = time.time() + self.duration()
- runtime = min(5.0, self.duration() / 4.0)
- i = 0
- while time.time() < endtime:
- for b in cluster[i:]: b.ready() # Check if any broker crashed.
- cluster[i].kill()
- i += 1
- b = cluster.start(expect=EXPECT_EXIT_FAIL)
- b.ready()
- time.sleep(runtime)
- sender.stop()
- receiver.stop()
- for b in cluster[i:]:
- b.ready() # Check it didn't crash
- b.kill()
- self.assertEqual(sender.sent, receiver.received)
- cluster_test_logs.verify_logs()
-
- finally:
- # Detach to avoid slow reconnect attempts during shut-down if test fails.
- if sender: sender.connection.detach()
- if receiver: receiver.connection.detach()
- logger.setLevel(log_level)
-
- def test_msg_group_failover(self):
- """Test fail-over during continuous send-receive of grouped messages.
- """
-
- class GroupedTrafficGenerator(Thread):
- def __init__(self, url, queue, group_key):
- Thread.__init__(self)
- self.url = url
- self.queue = queue
- self.group_key = group_key
- self.status = -1
-
- def run(self):
- # generate traffic for approx 10 seconds (2011msgs / 200 per-sec)
- cmd = ["msg_group_test",
- "--broker=%s" % self.url,
- "--address=%s" % self.queue,
- "--connection-options={%s}" % (Cluster.CONNECTION_OPTIONS),
- "--group-key=%s" % self.group_key,
- "--receivers=2",
- "--senders=3",
- "--messages=2011",
- "--send-rate=200",
- "--capacity=11",
- "--ack-frequency=23",
- "--allow-duplicates",
- "--group-size=37",
- "--randomize-group-size",
- "--interleave=13"]
- # "--trace"]
- self.generator = Popen( cmd );
- self.status = self.generator.wait()
- return self.status
-
- def results(self):
- self.join(timeout=30) # 3x assumed duration
- if self.isAlive(): return -1
- return self.status
-
- # Original cluster will all be killed so expect exit with failure
- cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL, args=["-t"])
- for b in cluster: b.ready() # Wait for brokers to be ready
-
- # create a queue with rather draconian flow control settings
- ssn0 = cluster[0].connect().session()
- q_args = "{'qpid.group_header_key':'group-id', 'qpid.shared_msg_group':1}"
- s0 = ssn0.sender("test-group-q; {create:always, node:{type:queue, x-declare:{arguments:%s}}}" % q_args)
-
- # Kill original brokers, start new ones for the duration.
- endtime = time.time() + self.duration();
- i = 0
- while time.time() < endtime:
- traffic = GroupedTrafficGenerator( cluster[i].host_port(),
- "test-group-q", "group-id" )
- traffic.start()
- time.sleep(1)
-
- for x in range(2):
- for b in cluster[i:]: b.ready() # Check if any broker crashed.
- cluster[i].kill()
- i += 1
- b = cluster.start(expect=EXPECT_EXIT_FAIL)
- time.sleep(1)
-
- # wait for traffic to finish, verify success
- self.assertEqual(0, traffic.results())
-
- for i in range(i, len(cluster)): cluster[i].kill()
-
-
-class StoreTests(BrokerTest):
- """
- Cluster tests that can only be run if there is a store available.
- """
- def args(self):
- assert BrokerTest.store_lib
- return ["--load-module", BrokerTest.store_lib]
-
- def test_store_loaded(self):
- """Ensure we are indeed loading a working store"""
- broker = self.broker(self.args(), name="recoverme", expect=EXPECT_EXIT_FAIL)
- m = Message("x", durable=True)
- broker.send_message("q", m)
- broker.kill()
- broker = self.broker(self.args(), name="recoverme")
- self.assertEqual("x", broker.get_message("q").content)
-
- def test_kill_restart(self):
- """Verify we can kill/resetart a broker with store in a cluster"""
- cluster = self.cluster(1, self.args())
- cluster.start("restartme", expect=EXPECT_EXIT_FAIL).kill()
-
- # Send a message, retrieve from the restarted broker
- cluster[0].send_message("q", "x")
- m = cluster.start("restartme").get_message("q")
- self.assertEqual("x", m.content)
-
- def stop_cluster(self,broker):
- """Clean shut-down of a cluster"""
- self.assertEqual(0, qpid_cluster.main(
- ["-kf", broker.host_port()]))
-
- def test_persistent_restart(self):
- """Verify persistent cluster shutdown/restart scenarios"""
- cluster = self.cluster(0, args=self.args() + ["--cluster-size=3"])
- a = cluster.start("a", expect=EXPECT_EXIT_OK, wait=False)
- b = cluster.start("b", expect=EXPECT_EXIT_OK, wait=False)
- c = cluster.start("c", expect=EXPECT_EXIT_FAIL, wait=True)
- a.send_message("q", Message("1", durable=True))
- # Kill & restart one member.
- c.kill()
- self.assertEqual(a.get_message("q").content, "1")
- a.send_message("q", Message("2", durable=True))
- c = cluster.start("c", expect=EXPECT_EXIT_OK)
- self.assertEqual(c.get_message("q").content, "2")
- # Shut down the entire cluster cleanly and bring it back up
- a.send_message("q", Message("3", durable=True))
- self.stop_cluster(a)
- a = cluster.start("a", wait=False)
- b = cluster.start("b", wait=False)
- c = cluster.start("c", wait=True)
- self.assertEqual(a.get_message("q").content, "3")
-
- def test_persistent_partial_failure(self):
- # Kill 2 members, shut down the last cleanly then restart
- # Ensure we use the clean database
- cluster = self.cluster(0, args=self.args() + ["--cluster-size=3"])
- a = cluster.start("a", expect=EXPECT_EXIT_FAIL, wait=False)
- b = cluster.start("b", expect=EXPECT_EXIT_FAIL, wait=False)
- c = cluster.start("c", expect=EXPECT_EXIT_OK, wait=True)
- a.send_message("q", Message("4", durable=True))
- a.kill()
- b.kill()
- self.assertEqual(c.get_message("q").content, "4")
- c.send_message("q", Message("clean", durable=True))
- self.stop_cluster(c)
- a = cluster.start("a", wait=False)
- b = cluster.start("b", wait=False)
- c = cluster.start("c", wait=True)
- self.assertEqual(a.get_message("q").content, "clean")
-
- def test_wrong_cluster_id(self):
- # Start a cluster1 broker, then try to restart in cluster2
- cluster1 = self.cluster(0, args=self.args())
- a = cluster1.start("a", expect=EXPECT_EXIT_OK)
- a.terminate()
- cluster2 = self.cluster(1, args=self.args())
- try:
- a = cluster2.start("a", expect=EXPECT_EXIT_FAIL)
- a.ready()
- self.fail("Expected exception")
- except: pass
-
- def test_wrong_shutdown_id(self):
- # Start 2 members and shut down.
- cluster = self.cluster(0, args=self.args()+["--cluster-size=2"])
- a = cluster.start("a", expect=EXPECT_EXIT_OK, wait=False)
- b = cluster.start("b", expect=EXPECT_EXIT_OK, wait=False)
- self.stop_cluster(a)
- self.assertEqual(a.wait(), 0)
- self.assertEqual(b.wait(), 0)
-
- # Restart with a different member and shut down.
- a = cluster.start("a", expect=EXPECT_EXIT_OK, wait=False)
- c = cluster.start("c", expect=EXPECT_EXIT_OK, wait=False)
- self.stop_cluster(a)
- self.assertEqual(a.wait(), 0)
- self.assertEqual(c.wait(), 0)
- # Mix members from both shutdown events, they should fail
- # TODO aconway 2010-03-11: can't predict the exit status of these
- # as it depends on the order of delivery of initial-status messages.
- # See comment at top of this file.
- a = cluster.start("a", expect=EXPECT_UNKNOWN, wait=False)
- b = cluster.start("b", expect=EXPECT_UNKNOWN, wait=False)
- self.assertRaises(Exception, lambda: a.ready())
- self.assertRaises(Exception, lambda: b.ready())
-
- def test_solo_store_clean(self):
- # A single node cluster should always leave a clean store.
- cluster = self.cluster(0, self.args())
- a = cluster.start("a", expect=EXPECT_EXIT_FAIL)
- a.send_message("q", Message("x", durable=True))
- a.kill()
- a = cluster.start("a")
- self.assertEqual(a.get_message("q").content, "x")
-
- def test_last_store_clean(self):
- # Verify that only the last node in a cluster to shut down has
- # a clean store. Start with cluster of 3, reduce to 1 then
- # increase again to ensure that a node that was once alone but
- # finally did not finish as the last node does not get a clean
- # store.
- cluster = self.cluster(0, self.args())
- a = cluster.start("a", expect=EXPECT_EXIT_FAIL)
- self.assertEqual(a.store_state(), "clean")
- b = cluster.start("b", expect=EXPECT_EXIT_FAIL)
- c = cluster.start("c", expect=EXPECT_EXIT_FAIL)
- self.assertEqual(b.store_state(), "dirty")
- self.assertEqual(c.store_state(), "dirty")
- retry(lambda: a.store_state() == "dirty")
-
- a.send_message("q", Message("x", durable=True))
- a.kill()
- b.kill() # c is last man, will mark store clean
- retry(lambda: c.store_state() == "clean")
- a = cluster.start("a", expect=EXPECT_EXIT_FAIL) # c no longer last man
- retry(lambda: c.store_state() == "dirty")
- c.kill() # a is now last man
- retry(lambda: a.store_state() == "clean")
- a.kill()
- self.assertEqual(a.store_state(), "clean")
- self.assertEqual(b.store_state(), "dirty")
- self.assertEqual(c.store_state(), "dirty")
-
- def test_restart_clean(self):
- """Verify that we can re-start brokers one by one in a
- persistent cluster after a clean oshutdown"""
- cluster = self.cluster(0, self.args())
- a = cluster.start("a", expect=EXPECT_EXIT_OK)
- b = cluster.start("b", expect=EXPECT_EXIT_OK)
- c = cluster.start("c", expect=EXPECT_EXIT_OK)
- a.send_message("q", Message("x", durable=True))
- self.stop_cluster(a)
- a = cluster.start("a")
- b = cluster.start("b")
- c = cluster.start("c")
- self.assertEqual(c.get_message("q").content, "x")
-
- def test_join_sub_size(self):
- """Verify that after starting a cluster with cluster-size=N,
- we can join new members even if size < N-1"""
- cluster = self.cluster(0, self.args()+["--cluster-size=3"])
- a = cluster.start("a", wait=False, expect=EXPECT_EXIT_FAIL)
- b = cluster.start("b", wait=False, expect=EXPECT_EXIT_FAIL)
- c = cluster.start("c")
- a.send_message("q", Message("x", durable=True))
- a.send_message("q", Message("y", durable=True))
- a.kill()
- b.kill()
- a = cluster.start("a")
- self.assertEqual(c.get_message("q").content, "x")
- b = cluster.start("b")
- self.assertEqual(c.get_message("q").content, "y")