diff options
author | Alan Conway <aconway@apache.org> | 2011-02-18 21:17:00 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2011-02-18 21:17:00 +0000 |
commit | dab4acd28e8affde4c418c2d83374a84d3a97c19 (patch) | |
tree | ed6197aa845e743d862255a1f05c54ad843c1540 | |
parent | 71d43de2e3e7b7919e3f81180aa1b7da17cf75ef (diff) | |
download | qpid-python-dab4acd28e8affde4c418c2d83374a84d3a97c19.tar.gz |
QPID-2935: Cluster flow control tests.
The tests check if send operations block when expected due to flow
control and unblock as expected when messages are drained.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2935@1072151 13f79535-47bb-0310-9956-ffa450edef68
-rwxr-xr-x | qpid/cpp/src/tests/cluster_tests.py | 245 |
1 files changed, 89 insertions, 156 deletions
diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py index 8ff83ade60..3e13a3ce8a 100755 --- a/qpid/cpp/src/tests/cluster_tests.py +++ b/qpid/cpp/src/tests/cluster_tests.py @@ -23,7 +23,7 @@ from qpid import datatypes, messaging from brokertest import * from qpid.harness import Skipped from qpid.messaging import Message, Empty -from threading import Thread, Lock +from threading import Thread, Lock, Condition from logging import getLogger from itertools import chain from tempfile import NamedTemporaryFile @@ -304,179 +304,112 @@ acl allow all all # Verify logs are consistent cluster_test_logs.verify_logs() - def test_queue_flowlimit(self): + class BlockedSend(Thread): + """Send a message, send is expected to block. + Verify that it does block (for a given timeout), then allow + waiting till it unblocks when it is expected to do so.""" + def __init__(self, sender, msg): + self.sender, self.msg = sender, msg + self.blocked = True + self.condition = Condition() + self.timeout = 0.1 # Time to wait for expected results. + Thread.__init__(self) + def run(self): + try: + self.sender.send(self.msg) + self.condition.acquire() + try: + self.blocked = False + self.condition.notify() + finally: self.condition.release() + except Exception,e: print "BlockedSend exception: %s"%e + def start(self): + Thread.start(self) + time.sleep(self.timeout) + assert self.blocked # Expected to block + def assert_blocked(self): assert self.blocked + def wait(self): # Now expecting to unblock + self.condition.acquire() + try: + while self.blocked: + self.condition.wait(self.timeout) + if self.blocked: raise Exception("Timed out waiting for send to unblock") + finally: self.condition.release() + self.join() + + def queue_flowlimit_test(self, brokers): """Verify that the queue's flowlimit configuration and state are correctly replicated. + The brokers argument allows this test to run on single broker, + cluster of 2 pre-startd brokers or cluster where second broker + starts after queue is in flow control. """ - return; # @todo enable once flow control works in clusters - # start a cluster of two brokers - args = ["--log-enable=info+:broker"] - cluster = self.cluster(2, args) - - # configure a queue with a specific flow limit on broker 0 - ssn0 = cluster[0].connect().session() - s0 = ssn0.sender("flq; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.max-count':99, 'qpid.flow_stop_count':5, 'qpid.flow_resume_count':3}}}}") - cluster[0].startQmf() - for q in cluster[0].qmf_session.getObjects(_class="queue"): - if q.name == "flq": - oid = q.getObjectId() - break - self.assertEqual(q.name, "flq") - self.assertEqual(q.flowStopCount, 5) - self.assertEqual(q.flowResumeCount, 3) - self.assertFalse(q.flowStopped) - - # verify both brokers in cluster have same configuration - cluster[1].startQmf() - qs = cluster[1].qmf_session.getObjects(_objectId=oid) - self.assertEqual(len(qs), 1) - q = qs[0] + # configure a queue with a specific flow limit on first broker + ssn0 = brokers.first().connect().session() + s0 = ssn0.sender("flq; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.flow_stop_count':5, 'qpid.flow_resume_count':3}}}}") + brokers.first().startQmf() + q = [q for q in brokers.first().qmf_session.getObjects(_class="queue") if q.name == "flq"][0] + oid = q.getObjectId() self.assertEqual(q.name, "flq") - self.assertEqual(q.flowStopCount, 5) - self.assertEqual(q.flowResumeCount, 3) - self.assertFalse(q.flowStopped) + self.assertEqual(q.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L}) + assert not q.flowStopped # fill the queue on one broker until flow control is active - class BlockedSender(Thread): - def __init__(self): Thread.__init__(self) - def run(self): - for x in range(6): - s0.send(Message(str(x))) - - sender = BlockedSender() - sender.start() - - start = time.time() - while time.time() < start + 5: - q = cluster[0].qmf_session.getObjects(_objectId=oid)[0] - if q.flowStopped: - break; - self.assertTrue(q.flowStopped) - - # verify flow control is active on other broker. - q = cluster[1].qmf_session.getObjects(_objectId=oid)[0] - self.assertTrue(q.flowStopped) - - # now drain the queue using a session to the other broker - ssn1 = cluster[1].connect().session() - r1 = ssn1.receiver("flq", capacity=6) - try: - while r1.fetch(timeout=0): - ssn1.acknowledge() - except Empty: - pass - sender.join() - - # and verify both brokers see an unblocked queue - q = cluster[0].qmf_session.getObjects(_objectId=oid)[0] - self.assertFalse(q.flowStopped) - q = cluster[1].qmf_session.getObjects(_objectId=oid)[0] - self.assertFalse(q.flowStopped) - - ssn0.connection.close() - ssn1.connection.close() - cluster_test_logs.verify_logs() - - - def test_queue_flowlimit_join(self): - """Verify that the queue's flowlimit configuration and state are - correctly replicated to a newly joined broker. - """ - return; # @todo enable once flow control works in clusters - # start a cluster of two brokers - #args = ["--log-enable=info+:broker"] - args = ["--log-enable=debug"] - cluster = self.cluster(2, args) - - # configure a queue with a specific flow limit on broker 0 - ssn0 = cluster[0].connect().session() - s0 = ssn0.sender("flq; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.max-count':99, 'qpid.flow_stop_count':5, 'qpid.flow_resume_count':3}}}}") - cluster[0].startQmf() - for q in cluster[0].qmf_session.getObjects(_class="queue"): - if q.name == "flq": - oid = q.getObjectId() - break - self.assertEqual(q.name, "flq") - self.assertEqual(q.flowStopCount, 5) - self.assertEqual(q.flowResumeCount, 3) - self.assertFalse(q.flowStopped) - - # verify both brokers in cluster have same configuration - cluster[1].startQmf() - qs = cluster[1].qmf_session.getObjects(_objectId=oid) + for x in range(5): s0.send(Message(str(x))) + sender = ShortTests.BlockedSend(s0, Message(str(6))) + sender.start() # Tests that sender does block + # Verify the broker queue goes into a flowStopped state + deadline = time.time() + 1 + while not q.flowStopped and time.time() < deadline: q.update() + assert q.flowStopped + sender.assert_blocked() # Still blocked + + # Now verify the both brokers in cluster have same configuration + brokers.second().startQmf() + qs = brokers.second().qmf_session.getObjects(_objectId=oid) self.assertEqual(len(qs), 1) q = qs[0] self.assertEqual(q.name, "flq") - self.assertEqual(q.flowStopCount, 5) - self.assertEqual(q.flowResumeCount, 3) - self.assertFalse(q.flowStopped) - - # fill the queue on one broker until flow control is active - class BlockedSender(Thread): - def __init__(self): Thread.__init__(self) - def run(self): - for x in range(6): - s0.send(Message(str(x))) - - sender = BlockedSender() - sender.start() - - start = time.time() - while time.time() < start + 5: - q = cluster[0].qmf_session.getObjects(_objectId=oid)[0] - if q.flowStopped: - break; - self.assertTrue(q.flowStopped) - - # verify flow control is active on other broker. - q = cluster[1].qmf_session.getObjects(_objectId=oid)[0] - self.assertTrue(q.flowStopped) - - # add a new broker to the cluster - print("Start") - cluster.start() - print("Start Done") - - # todo: enable verification: - # cluster[2].startQmf() - # qs = cluster[2].qmf_session.getObjects(_objectId=oid) - # self.assertEqual(len(qs), 1) - # q = qs[0] - # self.assertEqual(q.name, "flq") - # self.assertEqual(q.flowStopCount, 5) - # self.assertEqual(q.flowResumeCount, 3) - # self.assertEqual(q.msgDepth, 5) - # self.assertFalse(q.flowStopped) - # q = cluster[2].qmf_session.getObjects(_objectId=oid)[0] - # self.assertTrue(q.flowStopped) - - # verify new member's queue config - # verify new member's queue flow setting - - - + self.assertEqual(q.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L}) + assert q.flowStopped # now drain the queue using a session to the other broker - ssn1 = cluster[1].connect().session() + ssn1 = brokers.second().connect().session() r1 = ssn1.receiver("flq", capacity=6) - try: - while r1.fetch(timeout=1): - ssn1.acknowledge() - except Empty: - pass - sender.join() - - # and verify both brokers see an unblocked queue - q = cluster[0].qmf_session.getObjects(_objectId=oid)[0] - self.assertFalse(q.flowStopped) - q = cluster[1].qmf_session.getObjects(_objectId=oid)[0] - self.assertFalse(q.flowStopped) + for x in range(4): + r1.fetch(timeout=0) + ssn1.acknowledge() + sender.wait() # Verify no longer blocked. ssn0.connection.close() ssn1.connection.close() cluster_test_logs.verify_logs() + def test_queue_flowlimit(self): + """Test flow limits on a standalone broker""" + broker = self.broker() + class Brokers: + def first(self): return broker + def second(self): return broker + self.queue_flowlimit_test(Brokers()) + + def test_queue_flowlimit_cluster(self): + return # TODO aconway 2011-02-18: disabled till fixed, QPID-2935 + cluster = self.cluster(2) + class Brokers: + def first(self): return cluster[0] + def second(self): return cluster[1] + self.queue_flowlimit_test(Brokers()) + def test_queue_flowlimit_cluster_join(self): + return # TODO aconway 2011-02-18: disabled till fixed, QPID-2935 + cluster = self.cluster(1) + class Brokers: + def first(self): return cluster[0] + def second(self): + if len(cluster) == 1: cluster.start() + return cluster[1] + self.queue_flowlimit_test(Brokers()) class LongTests(BrokerTest): """Tests that can run for a long time if -DDURATION=<minutes> is set""" |