diff options
Diffstat (limited to 'cpp/src/tests/cluster_tests.py')
-rwxr-xr-x | cpp/src/tests/cluster_tests.py | 109 |
1 files changed, 108 insertions, 1 deletions
diff --git a/cpp/src/tests/cluster_tests.py b/cpp/src/tests/cluster_tests.py index cbad4010b4..3e13a3ce8a 100755 --- a/cpp/src/tests/cluster_tests.py +++ b/cpp/src/tests/cluster_tests.py @@ -23,7 +23,7 @@ from qpid import datatypes, messaging from brokertest import * from qpid.harness import Skipped from qpid.messaging import Message, Empty -from threading import Thread, Lock +from threading import Thread, Lock, Condition from logging import getLogger from itertools import chain from tempfile import NamedTemporaryFile @@ -304,6 +304,113 @@ acl allow all all # Verify logs are consistent cluster_test_logs.verify_logs() + class BlockedSend(Thread): + """Send a message, send is expected to block. + Verify that it does block (for a given timeout), then allow + waiting till it unblocks when it is expected to do so.""" + def __init__(self, sender, msg): + self.sender, self.msg = sender, msg + self.blocked = True + self.condition = Condition() + self.timeout = 0.1 # Time to wait for expected results. + Thread.__init__(self) + def run(self): + try: + self.sender.send(self.msg) + self.condition.acquire() + try: + self.blocked = False + self.condition.notify() + finally: self.condition.release() + except Exception,e: print "BlockedSend exception: %s"%e + def start(self): + Thread.start(self) + time.sleep(self.timeout) + assert self.blocked # Expected to block + def assert_blocked(self): assert self.blocked + def wait(self): # Now expecting to unblock + self.condition.acquire() + try: + while self.blocked: + self.condition.wait(self.timeout) + if self.blocked: raise Exception("Timed out waiting for send to unblock") + finally: self.condition.release() + self.join() + + def queue_flowlimit_test(self, brokers): + """Verify that the queue's flowlimit configuration and state are + correctly replicated. + The brokers argument allows this test to run on single broker, + cluster of 2 pre-startd brokers or cluster where second broker + starts after queue is in flow control. + """ + # configure a queue with a specific flow limit on first broker + ssn0 = brokers.first().connect().session() + s0 = ssn0.sender("flq; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.flow_stop_count':5, 'qpid.flow_resume_count':3}}}}") + brokers.first().startQmf() + q = [q for q in brokers.first().qmf_session.getObjects(_class="queue") if q.name == "flq"][0] + oid = q.getObjectId() + self.assertEqual(q.name, "flq") + self.assertEqual(q.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L}) + assert not q.flowStopped + + # fill the queue on one broker until flow control is active + for x in range(5): s0.send(Message(str(x))) + sender = ShortTests.BlockedSend(s0, Message(str(6))) + sender.start() # Tests that sender does block + # Verify the broker queue goes into a flowStopped state + deadline = time.time() + 1 + while not q.flowStopped and time.time() < deadline: q.update() + assert q.flowStopped + sender.assert_blocked() # Still blocked + + # Now verify the both brokers in cluster have same configuration + brokers.second().startQmf() + qs = brokers.second().qmf_session.getObjects(_objectId=oid) + self.assertEqual(len(qs), 1) + q = qs[0] + self.assertEqual(q.name, "flq") + self.assertEqual(q.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L}) + assert q.flowStopped + + # now drain the queue using a session to the other broker + ssn1 = brokers.second().connect().session() + r1 = ssn1.receiver("flq", capacity=6) + for x in range(4): + r1.fetch(timeout=0) + ssn1.acknowledge() + sender.wait() # Verify no longer blocked. + + ssn0.connection.close() + ssn1.connection.close() + cluster_test_logs.verify_logs() + + def test_queue_flowlimit(self): + """Test flow limits on a standalone broker""" + broker = self.broker() + class Brokers: + def first(self): return broker + def second(self): return broker + self.queue_flowlimit_test(Brokers()) + + def test_queue_flowlimit_cluster(self): + return # TODO aconway 2011-02-18: disabled till fixed, QPID-2935 + cluster = self.cluster(2) + class Brokers: + def first(self): return cluster[0] + def second(self): return cluster[1] + self.queue_flowlimit_test(Brokers()) + + def test_queue_flowlimit_cluster_join(self): + return # TODO aconway 2011-02-18: disabled till fixed, QPID-2935 + cluster = self.cluster(1) + class Brokers: + def first(self): return cluster[0] + def second(self): + if len(cluster) == 1: cluster.start() + return cluster[1] + self.queue_flowlimit_test(Brokers()) + class LongTests(BrokerTest): """Tests that can run for a long time if -DDURATION=<minutes> is set""" def duration(self): |