summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/cluster_tests.py
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/tests/cluster_tests.py')
-rwxr-xr-xqpid/cpp/src/tests/cluster_tests.py572
1 files changed, 54 insertions, 518 deletions
diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py
index 807e9508c3..3e13a3ce8a 100755
--- a/qpid/cpp/src/tests/cluster_tests.py
+++ b/qpid/cpp/src/tests/cluster_tests.py
@@ -18,12 +18,11 @@
# under the License.
#
-import os, signal, sys, time, imp, re, subprocess, glob, random, logging
-import cluster_test_logs
+import os, signal, sys, time, imp, re, subprocess, glob, cluster_test_logs
from qpid import datatypes, messaging
from brokertest import *
from qpid.harness import Skipped
-from qpid.messaging import Message, Empty, Disposition, REJECTED, util
+from qpid.messaging import Message, Empty
from threading import Thread, Lock, Condition
from logging import getLogger
from itertools import chain
@@ -97,15 +96,9 @@ class ShortTests(BrokerTest):
destination="amq.direct",
message=qpid.datatypes.Message(props, "content"))
- # Try message with TTL and differnet headers/properties
- cluster[0].send_message("q", Message(durable=True, ttl=100000))
- cluster[0].send_message("q", Message(durable=True, properties={}, ttl=100000))
- cluster[0].send_message("q", Message(durable=True, properties={"x":10}, ttl=100000))
-
# Now update a new member and compare their dumps.
cluster.start(args=["--test-store-dump", "updatee.dump"])
assert readfile("direct.dump") == readfile("updatee.dump")
-
os.remove("direct.dump")
os.remove("updatee.dump")
@@ -253,6 +246,25 @@ acl allow all all
session1 = cluster[1].connect().session()
for q in queues: self.assert_browse(session1, "q1", ["foo"])
+ def test_dr_no_message(self):
+ """Regression test for https://bugzilla.redhat.com/show_bug.cgi?id=655141
+ Joining broker crashes with 'error deliveryRecord no update message'
+ """
+
+ cluster = self.cluster(1)
+ session0 = cluster[0].connect().session()
+ s = session0.sender("q1;{create:always}")
+ s.send(Message("a", ttl=0.05), sync=False)
+ s.send(Message("b", ttl=0.05), sync=False)
+ r1 = session0.receiver("q1")
+ self.assertEqual("a", r1.fetch(timeout=0).content)
+ r2 = session0.receiver("q1;{mode:browse}")
+ self.assertEqual("b", r2.fetch(timeout=0).content)
+ # Leave messages un-acknowledged, let the expire, then start new broker.
+ time.sleep(.1)
+ cluster.start()
+ self.assertRaises(Empty, cluster[1].connect().session().receiver("q1").fetch,0)
+
def test_route_update(self):
"""Regression test for https://issues.apache.org/jira/browse/QPID-2982
Links and bridges associated with routes were not replicated on update.
@@ -260,7 +272,6 @@ acl allow all all
client was attached.
"""
args=["--mgmt-pub-interval=1","--log-enable=trace+:management"]
- # First broker will be killed.
cluster0 = self.cluster(1, args=args)
cluster1 = self.cluster(1, args=args)
assert 0 == subprocess.call(
@@ -290,47 +301,9 @@ acl allow all all
qpid_tool.wait()
scanner.join()
assert scanner.found
- # Regression test for https://issues.apache.org/jira/browse/QPID-3235
- # Inconsistent stats when changing elder.
-
- # Force a change of elder
- cluster0.start()
- cluster0[0].expect=EXPECT_EXIT_FAIL # About to die.
- cluster0[0].kill()
- time.sleep(2) # Allow a management interval to pass.
# Verify logs are consistent
cluster_test_logs.verify_logs()
- def test_redelivered(self):
- """Verify that redelivered flag is set correctly on replayed messages"""
- cluster = self.cluster(2, expect=EXPECT_EXIT_FAIL)
- url = "amqp:tcp:%s,tcp:%s" % (cluster[0].host_port(), cluster[1].host_port())
- queue = "my-queue"
- cluster[0].declare_queue(queue)
- self.sender = self.popen(
- ["qpid-send",
- "--broker", url,
- "--address", queue,
- "--sequence=true",
- "--send-eos=1",
- "--messages=100000",
- "--connection-options={%s}"%(Cluster.CONNECTION_OPTIONS)
- ])
- self.receiver = self.popen(
- ["qpid-receive",
- "--broker", url,
- "--address", queue,
- "--ignore-duplicates",
- "--check-redelivered",
- "--connection-options={%s}"%(Cluster.CONNECTION_OPTIONS),
- "--forever"
- ])
- time.sleep(1)#give sender enough time to have some messages to replay
- cluster[0].kill()
- self.sender.wait()
- self.receiver.wait()
- cluster[1].kill()
-
class BlockedSend(Thread):
"""Send a message, send is expected to block.
Verify that it does block (for a given timeout), then allow
@@ -343,7 +316,7 @@ acl allow all all
Thread.__init__(self)
def run(self):
try:
- self.sender.send(self.msg, sync=True)
+ self.sender.send(self.msg)
self.condition.acquire()
try:
self.blocked = False
@@ -375,12 +348,11 @@ acl allow all all
ssn0 = brokers.first().connect().session()
s0 = ssn0.sender("flq; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.flow_stop_count':5, 'qpid.flow_resume_count':3}}}}")
brokers.first().startQmf()
- q1 = [q for q in brokers.first().qmf_session.getObjects(_class="queue") if q.name == "flq"][0]
- oid = q1.getObjectId()
- self.assertEqual(q1.name, "flq")
- self.assertEqual(q1.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L})
- assert not q1.flowStopped
- self.assertEqual(q1.flowStoppedCount, 0)
+ q = [q for q in brokers.first().qmf_session.getObjects(_class="queue") if q.name == "flq"][0]
+ oid = q.getObjectId()
+ self.assertEqual(q.name, "flq")
+ self.assertEqual(q.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L})
+ assert not q.flowStopped
# fill the queue on one broker until flow control is active
for x in range(5): s0.send(Message(str(x)))
@@ -388,20 +360,18 @@ acl allow all all
sender.start() # Tests that sender does block
# Verify the broker queue goes into a flowStopped state
deadline = time.time() + 1
- while not q1.flowStopped and time.time() < deadline: q1.update()
- assert q1.flowStopped
- self.assertEqual(q1.flowStoppedCount, 1)
+ while not q.flowStopped and time.time() < deadline: q.update()
+ assert q.flowStopped
sender.assert_blocked() # Still blocked
# Now verify the both brokers in cluster have same configuration
brokers.second().startQmf()
qs = brokers.second().qmf_session.getObjects(_objectId=oid)
self.assertEqual(len(qs), 1)
- q2 = qs[0]
- self.assertEqual(q2.name, "flq")
- self.assertEqual(q2.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L})
- assert q2.flowStopped
- self.assertEqual(q2.flowStoppedCount, 1)
+ q = qs[0]
+ self.assertEqual(q.name, "flq")
+ self.assertEqual(q.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L})
+ assert q.flowStopped
# now drain the queue using a session to the other broker
ssn1 = brokers.second().connect().session()
@@ -411,12 +381,6 @@ acl allow all all
ssn1.acknowledge()
sender.wait() # Verify no longer blocked.
- # and re-verify state of queue on both brokers
- q1.update()
- assert not q1.flowStopped
- q2.update()
- assert not q2.flowStopped
-
ssn0.connection.close()
ssn1.connection.close()
cluster_test_logs.verify_logs()
@@ -430,6 +394,7 @@ acl allow all all
self.queue_flowlimit_test(Brokers())
def test_queue_flowlimit_cluster(self):
+ return # TODO aconway 2011-02-18: disabled till fixed, QPID-2935
cluster = self.cluster(2)
class Brokers:
def first(self): return cluster[0]
@@ -437,6 +402,7 @@ acl allow all all
self.queue_flowlimit_test(Brokers())
def test_queue_flowlimit_cluster_join(self):
+ return # TODO aconway 2011-02-18: disabled till fixed, QPID-2935
cluster = self.cluster(1)
class Brokers:
def first(self): return cluster[0]
@@ -445,274 +411,6 @@ acl allow all all
return cluster[1]
self.queue_flowlimit_test(Brokers())
- def test_queue_flowlimit_replicate(self):
- """ Verify that a queue which is in flow control BUT has drained BELOW
- the flow control 'stop' threshold, is correctly replicated when a new
- broker is added to the cluster.
- """
-
- class AsyncSender(Thread):
- """Send a fixed number of msgs from a sender in a separate thread
- so it may block without blocking the test.
- """
- def __init__(self, broker, address, count=1, size=4):
- Thread.__init__(self)
- self.daemon = True
- self.broker = broker
- self.queue = address
- self.count = count
- self.size = size
- self.done = False
-
- def run(self):
- self.sender = subprocess.Popen(["qpid-send",
- "--capacity=1",
- "--content-size=%s" % self.size,
- "--messages=%s" % self.count,
- "--failover-updates",
- "--connection-options={%s}"%(Cluster.CONNECTION_OPTIONS),
- "--address=%s" % self.queue,
- "--broker=%s" % self.broker.host_port()])
- self.sender.wait()
- self.done = True
-
- cluster = self.cluster(2)
- # create a queue with rather draconian flow control settings
- ssn0 = cluster[0].connect().session()
- s0 = ssn0.sender("flq; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.flow_stop_count':100, 'qpid.flow_resume_count':20}}}}")
-
- # fire off the sending thread to broker[0], and wait until the queue
- # hits flow control on broker[1]
- sender = AsyncSender(cluster[0], "flq", count=110);
- sender.start();
-
- cluster[1].startQmf()
- q_obj = [q for q in cluster[1].qmf_session.getObjects(_class="queue") if q.name == "flq"][0]
- deadline = time.time() + 10
- while not q_obj.flowStopped and time.time() < deadline:
- q_obj.update()
- assert q_obj.flowStopped
- assert not sender.done
- assert q_obj.msgDepth < 110
-
- # Now drain enough messages on broker[1] to drop below the flow stop
- # threshold, but not relieve flow control...
- receiver = subprocess.Popen(["qpid-receive",
- "--messages=15",
- "--timeout=1",
- "--print-content=no",
- "--failover-updates",
- "--connection-options={%s}"%(Cluster.CONNECTION_OPTIONS),
- "--ack-frequency=1",
- "--address=flq",
- "--broker=%s" % cluster[1].host_port()])
- receiver.wait()
- q_obj.update()
- assert q_obj.flowStopped
- assert not sender.done
- current_depth = q_obj.msgDepth
-
- # add a new broker to the cluster, and verify that the queue is in flow
- # control on that broker
- cluster.start()
- cluster[2].startQmf()
- q_obj = [q for q in cluster[2].qmf_session.getObjects(_class="queue") if q.name == "flq"][0]
- assert q_obj.flowStopped
- assert q_obj.msgDepth == current_depth
-
- # now drain the queue on broker[2], and verify that the sender becomes
- # unblocked
- receiver = subprocess.Popen(["qpid-receive",
- "--messages=95",
- "--timeout=1",
- "--print-content=no",
- "--failover-updates",
- "--connection-options={%s}"%(Cluster.CONNECTION_OPTIONS),
- "--ack-frequency=1",
- "--address=flq",
- "--broker=%s" % cluster[2].host_port()])
- receiver.wait()
- q_obj.update()
- assert not q_obj.flowStopped
- self.assertEqual(q_obj.msgDepth, 0)
-
- # verify that the sender has become unblocked
- sender.join(timeout=5)
- assert not sender.isAlive()
- assert sender.done
-
- def test_blocked_queue_delete(self):
- """Verify that producers which are blocked on a queue due to flow
- control are unblocked when that queue is deleted.
- """
-
- cluster = self.cluster(2)
- cluster[0].startQmf()
- cluster[1].startQmf()
-
- # configure a queue with a specific flow limit on first broker
- ssn0 = cluster[0].connect().session()
- s0 = ssn0.sender("flq; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.flow_stop_count':5, 'qpid.flow_resume_count':3}}}}")
- q1 = [q for q in cluster[0].qmf_session.getObjects(_class="queue") if q.name == "flq"][0]
- oid = q1.getObjectId()
- self.assertEqual(q1.name, "flq")
- self.assertEqual(q1.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L})
- assert not q1.flowStopped
- self.assertEqual(q1.flowStoppedCount, 0)
-
- # fill the queue on one broker until flow control is active
- for x in range(5): s0.send(Message(str(x)))
- sender = ShortTests.BlockedSend(s0, Message(str(6)))
- sender.start() # Tests that sender does block
- # Verify the broker queue goes into a flowStopped state
- deadline = time.time() + 1
- while not q1.flowStopped and time.time() < deadline: q1.update()
- assert q1.flowStopped
- self.assertEqual(q1.flowStoppedCount, 1)
- sender.assert_blocked() # Still blocked
-
- # Now verify the both brokers in cluster have same configuration
- qs = cluster[1].qmf_session.getObjects(_objectId=oid)
- self.assertEqual(len(qs), 1)
- q2 = qs[0]
- self.assertEqual(q2.name, "flq")
- self.assertEqual(q2.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L})
- assert q2.flowStopped
- self.assertEqual(q2.flowStoppedCount, 1)
-
- # now delete the blocked queue from other broker
- ssn1 = cluster[1].connect().session()
- self.evaluate_address(ssn1, "flq;{delete:always}")
- sender.wait() # Verify no longer blocked.
-
- ssn0.connection.close()
- ssn1.connection.close()
- cluster_test_logs.verify_logs()
-
-
- def test_alternate_exchange_update(self):
- """Verify that alternate-exchange on exchanges and queues is propagated to new members of a cluster. """
- cluster = self.cluster(1)
- s0 = cluster[0].connect().session()
- # create alt queue bound to amq.fanout exchange, will be destination for alternate exchanges
- self.evaluate_address(s0, "alt;{create:always,node:{x-bindings:[{exchange:'amq.fanout',queue:alt}]}}")
- # create direct exchange ex with alternate-exchange amq.fanout and no queues bound
- self.evaluate_address(s0, "ex;{create:always,node:{type:topic, x-declare:{type:'direct', alternate-exchange:'amq.fanout'}}}")
- # create queue q with alternate-exchange amq.fanout
- self.evaluate_address(s0, "q;{create:always,node:{type:queue, x-declare:{alternate-exchange:'amq.fanout'}}}")
-
- def verify(broker):
- s = broker.connect().session()
- # Verify unmatched message goes to ex's alternate.
- s.sender("ex").send("foo")
- self.assertEqual("foo", s.receiver("alt").fetch(timeout=0).content)
- # Verify rejected message goes to q's alternate.
- s.sender("q").send("bar")
- msg = s.receiver("q").fetch(timeout=0)
- self.assertEqual("bar", msg.content)
- s.acknowledge(msg, Disposition(REJECTED)) # Reject the message
- self.assertEqual("bar", s.receiver("alt").fetch(timeout=0).content)
-
- verify(cluster[0])
- cluster.start()
- verify(cluster[1])
-
- def test_binding_order(self):
- """Regression test for binding order inconsistency in cluster"""
- cluster = self.cluster(1)
- c0 = cluster[0].connect()
- s0 = c0.session()
- # Declare multiple queues bound to same key on amq.topic
- def declare(q,max=0):
- if max: declare = 'x-declare:{arguments:{"qpid.max_count":%d, "qpid.flow_stop_count":0}}'%max
- else: declare = 'x-declare:{}'
- bind='x-bindings:[{queue:%s,key:key,exchange:"amq.topic"}]'%(q)
- s0.sender("%s;{create:always,node:{%s,%s}}" % (q,declare,bind))
- declare('d',max=4) # Only one with a limit
- for q in ['c', 'b','a']: declare(q)
- # Add a cluster member, send enough messages to exceed the max count
- cluster.start()
- try:
- s = s0.sender('amq.topic/key')
- for m in xrange(1,6): s.send(Message(str(m)))
- self.fail("Expected capacity exceeded exception")
- except messaging.exceptions.TargetCapacityExceeded: pass
- c1 = cluster[1].connect()
- s1 = c1.session()
- s0 = c0.session() # Old session s0 is broken by exception.
- # Verify queue contents are consistent.
- for q in ['a','b','c','d']:
- self.assertEqual(self.browse(s0, q), self.browse(s1, q))
- # Verify queue contents are "best effort"
- for q in ['a','b','c']: self.assert_browse(s1,q,[str(n) for n in xrange(1,6)])
- self.assert_browse(s1,'d',[str(n) for n in xrange(1,5)])
-
- def test_deleted_exchange(self):
- """QPID-3215: cached exchange reference can cause cluster inconsistencies
- if exchange is deleted/recreated
- Verify stand-alone case
- """
- cluster = self.cluster()
- # Verify we do not route message via an exchange that has been destroyed.
- cluster.start()
- s0 = cluster[0].connect().session()
- self.evaluate_address(s0, "ex;{create:always,node:{type:topic}}")
- self.evaluate_address(s0, "q;{create:always,node:{x-bindings:[{exchange:'ex',queue:q,key:foo}]}}")
- send0 = s0.sender("ex/foo")
- send0.send("foo")
- self.assert_browse(s0, "q", ["foo"])
- self.evaluate_address(s0, "ex;{delete:always}")
- try:
- send0.send("bar") # Should fail, exchange is deleted.
- self.fail("Expected not-found exception")
- except qpid.messaging.NotFound: pass
- self.assert_browse(cluster[0].connect().session(), "q", ["foo"])
-
- def test_deleted_exchange_inconsistent(self):
- """QPID-3215: cached exchange reference can cause cluster inconsistencies
- if exchange is deleted/recreated
-
- Verify cluster inconsistency.
- """
- cluster = self.cluster()
- cluster.start()
- s0 = cluster[0].connect().session()
- self.evaluate_address(s0, "ex;{create:always,node:{type:topic}}")
- self.evaluate_address(s0, "q;{create:always,node:{x-bindings:[{exchange:'ex',queue:q,key:foo}]}}")
- send0 = s0.sender("ex/foo")
- send0.send("foo")
- self.assert_browse(s0, "q", ["foo"])
-
- cluster.start()
- s1 = cluster[1].connect().session()
- self.evaluate_address(s0, "ex;{delete:always}")
- try:
- send0.send("bar")
- self.fail("Expected not-found exception")
- except qpid.messaging.NotFound: pass
-
- self.assert_browse(s1, "q", ["foo"])
-
-
- def test_ttl_consistent(self):
- """Ensure we don't get inconsistent errors with message that have TTL very close together"""
- messages = [ Message(str(i), ttl=i/1000.0) for i in xrange(0,1000)]
- messages.append(Message("x"))
- cluster = self.cluster(2)
- sender = cluster[0].connect().session().sender("q;{create:always}")
-
- def fetch(b):
- receiver = b.connect().session().receiver("q;{create:always}")
- while receiver.fetch().content != "x": pass
-
- for m in messages: sender.send(m, sync=False)
- for m in messages: sender.send(m, sync=False)
- fetch(cluster[0])
- fetch(cluster[1])
- for m in messages: sender.send(m, sync=False)
- cluster.start()
- fetch(cluster[2])
-
class LongTests(BrokerTest):
"""Tests that can run for a long time if -DDURATION=<minutes> is set"""
def duration(self):
@@ -725,28 +423,22 @@ class LongTests(BrokerTest):
# Original cluster will all be killed so expect exit with failure
cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL)
- for b in cluster: b.ready() # Wait for brokers to be ready
for b in cluster: ErrorGenerator(b)
# Start sender and receiver threads
cluster[0].declare_queue("test-queue")
- sender = NumberedSender(cluster[0], 1000) # Max queue depth
- receiver = NumberedReceiver(cluster[0], sender)
+ sender = NumberedSender(cluster[1], 1000) # Max queue depth
+ receiver = NumberedReceiver(cluster[2], sender)
receiver.start()
sender.start()
- # Wait for sender & receiver to get up and running
- retry(lambda: receiver.received > 0)
# Kill original brokers, start new ones for the duration.
endtime = time.time() + self.duration()
i = 0
while time.time() < endtime:
- sender.sender.assert_running()
- receiver.receiver.assert_running()
cluster[i].kill()
i += 1
b = cluster.start(expect=EXPECT_EXIT_FAIL)
- for b in cluster[i:]: b.ready()
ErrorGenerator(b)
time.sleep(5)
sender.stop()
@@ -777,24 +469,24 @@ class LongTests(BrokerTest):
if self.stopped: break
self.process = self.broker.test.popen(
self.cmd, expect=EXPECT_UNKNOWN)
- finally:
- self.lock.release()
- try:
- exit = self.process.wait()
+ finally: self.lock.release()
+ try: exit = self.process.wait()
except OSError, e:
- # Process may already have been killed by self.stop()
- break
+ # Seems to be a race in wait(), it throws
+ # "no such process" during test shutdown.
+ # Doesn't indicate a test error, ignore.
+ return
except Exception, e:
self.process.unexpected(
"client of %s: %s"%(self.broker.name, e))
self.lock.acquire()
try:
+ # Quit and ignore errors if stopped or expecting failure.
if self.stopped: break
if exit != 0:
self.process.unexpected(
"client of %s exit code %s"%(self.broker.name, exit))
- finally:
- self.lock.release()
+ finally: self.lock.release()
except Exception, e:
self.error = RethrownException("Error in ClientLoop.run")
@@ -816,7 +508,7 @@ class LongTests(BrokerTest):
args += ["--log-enable=trace+:management"]
# Use store if present.
if BrokerTest.store_lib: args +=["--load-module", BrokerTest.store_lib]
- cluster = self.cluster(3, args, expect=EXPECT_EXIT_FAIL) # brokers will be killed
+ cluster = self.cluster(3, args)
clients = [] # Per-broker list of clients that only connect to one broker.
mclients = [] # Management clients that connect to every broker in the cluster.
@@ -825,12 +517,10 @@ class LongTests(BrokerTest):
"""Start ordinary clients for a broker."""
cmds=[
["qpid-tool", "localhost:%s"%(broker.port())],
- ["qpid-perftest", "--count=5000", "--durable=yes",
+ ["qpid-perftest", "--count", 50000,
"--base-name", str(qpid.datatypes.uuid4()), "--port", broker.port()],
- ["qpid-txtest", "--queue-base-name", "tx-%s"%str(qpid.datatypes.uuid4()),
- "--port", broker.port()],
- ["qpid-queue-stats", "-a", "localhost:%s" %(broker.port())]
- ]
+ ["qpid-queue-stats", "-a", "localhost:%s" %(broker.port())],
+ ["testagent", "localhost", str(broker.port())] ]
clients.append([ClientLoop(broker, cmd) for cmd in cmds])
def start_mclients(broker):
@@ -839,8 +529,7 @@ class LongTests(BrokerTest):
mclients.append(ClientLoop(broker, cmd))
endtime = time.time() + self.duration()
- # For long duration, first run is a quarter of the duration.
- runtime = min(5.0, self.duration() / 3.0)
+ runtime = self.duration() / 4 # First run is longer, use quarter of duration.
alive = 0 # First live cluster member
for i in range(len(cluster)): start_clients(cluster[i])
start_mclients(cluster[alive])
@@ -851,7 +540,7 @@ class LongTests(BrokerTest):
for b in cluster[alive:]: b.ready() # Check if a broker crashed.
# Kill the first broker, expect the clients to fail.
b = cluster[alive]
- b.ready()
+ b.expect = EXPECT_EXIT_FAIL
b.kill()
# Stop the brokers clients and all the mclients.
for c in clients[alive] + mclients:
@@ -861,179 +550,26 @@ class LongTests(BrokerTest):
mclients = []
# Start another broker and clients
alive += 1
- cluster.start(expect=EXPECT_EXIT_FAIL)
- cluster[-1].ready() # Wait till its ready
+ cluster.start()
start_clients(cluster[-1])
start_mclients(cluster[alive])
for c in chain(mclients, *clients):
c.stop()
- for b in cluster[alive:]:
- b.ready() # Verify still alive
- b.kill()
+
# Verify that logs are consistent
cluster_test_logs.verify_logs()
def test_management_qmf2(self):
self.test_management(args=["--mgmt-qmf2=yes"])
- def test_connect_consistent(self):
+ def test_connect_consistent(self): # FIXME aconway 2011-01-18:
args=["--mgmt-pub-interval=1","--log-enable=trace+:management"]
cluster = self.cluster(2, args=args)
end = time.time() + self.duration()
while (time.time() < end): # Get a management interval
for i in xrange(1000): cluster[0].connect().close()
- cluster_test_logs.verify_logs()
-
- def test_flowlimit_failover(self):
- """Test fail-over during continuous send-receive with flow control
- active.
- """
-
- # Original cluster will all be killed so expect exit with failure
- cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL)
- for b in cluster: b.ready() # Wait for brokers to be ready
-
- # create a queue with rather draconian flow control settings
- ssn0 = cluster[0].connect().session()
- s0 = ssn0.sender("test-queue; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.flow_stop_count':2000, 'qpid.flow_resume_count':100}}}}")
-
- receiver = NumberedReceiver(cluster[0])
- receiver.start()
- senders = [NumberedSender(cluster[0]) for i in range(1,3)]
- for s in senders:
- s.start()
- # Wait for senders & receiver to get up and running
- retry(lambda: receiver.received > 2*senders)
-
- # Kill original brokers, start new ones for the duration.
- endtime = time.time() + self.duration();
- i = 0
- while time.time() < endtime:
- for s in senders: s.sender.assert_running()
- receiver.receiver.assert_running()
- for b in cluster[i:]: b.ready() # Check if any broker crashed.
- cluster[i].kill()
- i += 1
- b = cluster.start(expect=EXPECT_EXIT_FAIL)
- time.sleep(5)
- for s in senders:
- s.stop()
- receiver.stop()
- for i in range(i, len(cluster)): cluster[i].kill()
-
- def test_ttl_failover(self):
- """Test that messages with TTL don't cause problems in a cluster with failover"""
-
- class Client(StoppableThread):
-
- def __init__(self, broker):
- StoppableThread.__init__(self)
- self.connection = broker.connect(reconnect=True)
- self.auto_fetch_reconnect_urls(self.connection)
- self.session = self.connection.session()
-
- def auto_fetch_reconnect_urls(self, conn):
- """Replacment for qpid.messaging.util version which is noisy"""
- ssn = conn.session("auto-fetch-reconnect-urls")
- rcv = ssn.receiver("amq.failover")
- rcv.capacity = 10
-
- def main():
- while True:
- try:
- msg = rcv.fetch()
- qpid.messaging.util.set_reconnect_urls(conn, msg)
- ssn.acknowledge(msg, sync=False)
- except messaging.exceptions.LinkClosed: return
- except messaging.exceptions.ConnectionError: return
-
- thread = Thread(name="auto-fetch-reconnect-urls", target=main)
- thread.setDaemon(True)
- thread.start()
-
- def stop(self):
- StoppableThread.stop(self)
- self.connection.detach()
-
- class Sender(Client):
- def __init__(self, broker, address):
- Client.__init__(self, broker)
- self.sent = 0 # Number of messages _reliably_ sent.
- self.sender = self.session.sender(address, capacity=1000)
-
- def send_counted(self, ttl):
- self.sender.send(Message(str(self.sent), ttl=ttl))
- self.sent += 1
-
- def run(self):
- while not self.stopped:
- choice = random.randint(0,4)
- if choice == 0: self.send_counted(None) # No ttl
- elif choice == 1: self.send_counted(100000) # Large ttl
- else: # Small ttl, might expire
- self.sender.send(Message("", ttl=random.random()/10))
- self.sender.send(Message("z"), sync=True) # Chaser.
-
- class Receiver(Client):
-
- def __init__(self, broker, address):
- Client.__init__(self, broker)
- self.received = 0 # Number of non-empty (reliable) messages received.
- self.receiver = self.session.receiver(address, capacity=1000)
- def run(self):
- try:
- while True:
- m = self.receiver.fetch(1)
- if m.content == "z": break
- if m.content: # Ignore unreliable messages
- # Ignore duplicates
- if int(m.content) == self.received: self.received += 1
- except Exception,e: self.error = e
-
- # def test_ttl_failover
-
- # Original cluster will all be killed so expect exit with failure
- # Set small purge interval.
- cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL, args=["--queue-purge-interval=1"])
- for b in cluster: b.ready() # Wait for brokers to be ready
-
- # Python client failover produces noisy WARN logs, disable temporarily
- logger = logging.getLogger()
- log_level = logger.getEffectiveLevel()
- logger.setLevel(logging.ERROR)
- try:
- # Start sender and receiver threads
- receiver = Receiver(cluster[0], "q;{create:always}")
- receiver.start()
- sender = Sender(cluster[0], "q;{create:always}")
- sender.start()
- # Wait for sender & receiver to get up and running
- retry(lambda: receiver.received > 0)
-
- # Kill brokers in a cycle.
- endtime = time.time() + self.duration()
- runtime = min(5.0, self.duration() / 4.0)
- i = 0
- while time.time() < endtime:
- for b in cluster[i:]: b.ready() # Check if any broker crashed.
- cluster[i].kill()
- i += 1
- b = cluster.start(expect=EXPECT_EXIT_FAIL)
- b.ready()
- time.sleep(runtime)
- sender.stop()
- receiver.stop()
- for b in cluster[i:]:
- b.ready() # Check it didn't crash
- b.kill()
- self.assertEqual(sender.sent, receiver.received)
cluster_test_logs.verify_logs()
- finally:
- # Detach to avoid slow reconnect attempts during shut-down if test fails.
- sender.connection.detach()
- receiver.connection.detach()
- logger.setLevel(log_level)
class StoreTests(BrokerTest):
"""