summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2011-02-18 21:17:00 +0000
committerAlan Conway <aconway@apache.org>2011-02-18 21:17:00 +0000
commitdab4acd28e8affde4c418c2d83374a84d3a97c19 (patch)
treeed6197aa845e743d862255a1f05c54ad843c1540
parent71d43de2e3e7b7919e3f81180aa1b7da17cf75ef (diff)
downloadqpid-python-dab4acd28e8affde4c418c2d83374a84d3a97c19.tar.gz
QPID-2935: Cluster flow control tests.
The tests check if send operations block when expected due to flow control and unblock as expected when messages are drained. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2935@1072151 13f79535-47bb-0310-9956-ffa450edef68
-rwxr-xr-xqpid/cpp/src/tests/cluster_tests.py245
1 files changed, 89 insertions, 156 deletions
diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py
index 8ff83ade60..3e13a3ce8a 100755
--- a/qpid/cpp/src/tests/cluster_tests.py
+++ b/qpid/cpp/src/tests/cluster_tests.py
@@ -23,7 +23,7 @@ from qpid import datatypes, messaging
from brokertest import *
from qpid.harness import Skipped
from qpid.messaging import Message, Empty
-from threading import Thread, Lock
+from threading import Thread, Lock, Condition
from logging import getLogger
from itertools import chain
from tempfile import NamedTemporaryFile
@@ -304,179 +304,112 @@ acl allow all all
# Verify logs are consistent
cluster_test_logs.verify_logs()
- def test_queue_flowlimit(self):
+ class BlockedSend(Thread):
+ """Send a message, send is expected to block.
+ Verify that it does block (for a given timeout), then allow
+ waiting till it unblocks when it is expected to do so."""
+ def __init__(self, sender, msg):
+ self.sender, self.msg = sender, msg
+ self.blocked = True
+ self.condition = Condition()
+ self.timeout = 0.1 # Time to wait for expected results.
+ Thread.__init__(self)
+ def run(self):
+ try:
+ self.sender.send(self.msg)
+ self.condition.acquire()
+ try:
+ self.blocked = False
+ self.condition.notify()
+ finally: self.condition.release()
+ except Exception,e: print "BlockedSend exception: %s"%e
+ def start(self):
+ Thread.start(self)
+ time.sleep(self.timeout)
+ assert self.blocked # Expected to block
+ def assert_blocked(self): assert self.blocked
+ def wait(self): # Now expecting to unblock
+ self.condition.acquire()
+ try:
+ while self.blocked:
+ self.condition.wait(self.timeout)
+ if self.blocked: raise Exception("Timed out waiting for send to unblock")
+ finally: self.condition.release()
+ self.join()
+
+ def queue_flowlimit_test(self, brokers):
"""Verify that the queue's flowlimit configuration and state are
correctly replicated.
+ The brokers argument allows this test to run on single broker,
+ cluster of 2 pre-startd brokers or cluster where second broker
+ starts after queue is in flow control.
"""
- return; # @todo enable once flow control works in clusters
- # start a cluster of two brokers
- args = ["--log-enable=info+:broker"]
- cluster = self.cluster(2, args)
-
- # configure a queue with a specific flow limit on broker 0
- ssn0 = cluster[0].connect().session()
- s0 = ssn0.sender("flq; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.max-count':99, 'qpid.flow_stop_count':5, 'qpid.flow_resume_count':3}}}}")
- cluster[0].startQmf()
- for q in cluster[0].qmf_session.getObjects(_class="queue"):
- if q.name == "flq":
- oid = q.getObjectId()
- break
- self.assertEqual(q.name, "flq")
- self.assertEqual(q.flowStopCount, 5)
- self.assertEqual(q.flowResumeCount, 3)
- self.assertFalse(q.flowStopped)
-
- # verify both brokers in cluster have same configuration
- cluster[1].startQmf()
- qs = cluster[1].qmf_session.getObjects(_objectId=oid)
- self.assertEqual(len(qs), 1)
- q = qs[0]
+ # configure a queue with a specific flow limit on first broker
+ ssn0 = brokers.first().connect().session()
+ s0 = ssn0.sender("flq; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.flow_stop_count':5, 'qpid.flow_resume_count':3}}}}")
+ brokers.first().startQmf()
+ q = [q for q in brokers.first().qmf_session.getObjects(_class="queue") if q.name == "flq"][0]
+ oid = q.getObjectId()
self.assertEqual(q.name, "flq")
- self.assertEqual(q.flowStopCount, 5)
- self.assertEqual(q.flowResumeCount, 3)
- self.assertFalse(q.flowStopped)
+ self.assertEqual(q.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L})
+ assert not q.flowStopped
# fill the queue on one broker until flow control is active
- class BlockedSender(Thread):
- def __init__(self): Thread.__init__(self)
- def run(self):
- for x in range(6):
- s0.send(Message(str(x)))
-
- sender = BlockedSender()
- sender.start()
-
- start = time.time()
- while time.time() < start + 5:
- q = cluster[0].qmf_session.getObjects(_objectId=oid)[0]
- if q.flowStopped:
- break;
- self.assertTrue(q.flowStopped)
-
- # verify flow control is active on other broker.
- q = cluster[1].qmf_session.getObjects(_objectId=oid)[0]
- self.assertTrue(q.flowStopped)
-
- # now drain the queue using a session to the other broker
- ssn1 = cluster[1].connect().session()
- r1 = ssn1.receiver("flq", capacity=6)
- try:
- while r1.fetch(timeout=0):
- ssn1.acknowledge()
- except Empty:
- pass
- sender.join()
-
- # and verify both brokers see an unblocked queue
- q = cluster[0].qmf_session.getObjects(_objectId=oid)[0]
- self.assertFalse(q.flowStopped)
- q = cluster[1].qmf_session.getObjects(_objectId=oid)[0]
- self.assertFalse(q.flowStopped)
-
- ssn0.connection.close()
- ssn1.connection.close()
- cluster_test_logs.verify_logs()
-
-
- def test_queue_flowlimit_join(self):
- """Verify that the queue's flowlimit configuration and state are
- correctly replicated to a newly joined broker.
- """
- return; # @todo enable once flow control works in clusters
- # start a cluster of two brokers
- #args = ["--log-enable=info+:broker"]
- args = ["--log-enable=debug"]
- cluster = self.cluster(2, args)
-
- # configure a queue with a specific flow limit on broker 0
- ssn0 = cluster[0].connect().session()
- s0 = ssn0.sender("flq; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.max-count':99, 'qpid.flow_stop_count':5, 'qpid.flow_resume_count':3}}}}")
- cluster[0].startQmf()
- for q in cluster[0].qmf_session.getObjects(_class="queue"):
- if q.name == "flq":
- oid = q.getObjectId()
- break
- self.assertEqual(q.name, "flq")
- self.assertEqual(q.flowStopCount, 5)
- self.assertEqual(q.flowResumeCount, 3)
- self.assertFalse(q.flowStopped)
-
- # verify both brokers in cluster have same configuration
- cluster[1].startQmf()
- qs = cluster[1].qmf_session.getObjects(_objectId=oid)
+ for x in range(5): s0.send(Message(str(x)))
+ sender = ShortTests.BlockedSend(s0, Message(str(6)))
+ sender.start() # Tests that sender does block
+ # Verify the broker queue goes into a flowStopped state
+ deadline = time.time() + 1
+ while not q.flowStopped and time.time() < deadline: q.update()
+ assert q.flowStopped
+ sender.assert_blocked() # Still blocked
+
+ # Now verify the both brokers in cluster have same configuration
+ brokers.second().startQmf()
+ qs = brokers.second().qmf_session.getObjects(_objectId=oid)
self.assertEqual(len(qs), 1)
q = qs[0]
self.assertEqual(q.name, "flq")
- self.assertEqual(q.flowStopCount, 5)
- self.assertEqual(q.flowResumeCount, 3)
- self.assertFalse(q.flowStopped)
-
- # fill the queue on one broker until flow control is active
- class BlockedSender(Thread):
- def __init__(self): Thread.__init__(self)
- def run(self):
- for x in range(6):
- s0.send(Message(str(x)))
-
- sender = BlockedSender()
- sender.start()
-
- start = time.time()
- while time.time() < start + 5:
- q = cluster[0].qmf_session.getObjects(_objectId=oid)[0]
- if q.flowStopped:
- break;
- self.assertTrue(q.flowStopped)
-
- # verify flow control is active on other broker.
- q = cluster[1].qmf_session.getObjects(_objectId=oid)[0]
- self.assertTrue(q.flowStopped)
-
- # add a new broker to the cluster
- print("Start")
- cluster.start()
- print("Start Done")
-
- # todo: enable verification:
- # cluster[2].startQmf()
- # qs = cluster[2].qmf_session.getObjects(_objectId=oid)
- # self.assertEqual(len(qs), 1)
- # q = qs[0]
- # self.assertEqual(q.name, "flq")
- # self.assertEqual(q.flowStopCount, 5)
- # self.assertEqual(q.flowResumeCount, 3)
- # self.assertEqual(q.msgDepth, 5)
- # self.assertFalse(q.flowStopped)
- # q = cluster[2].qmf_session.getObjects(_objectId=oid)[0]
- # self.assertTrue(q.flowStopped)
-
- # verify new member's queue config
- # verify new member's queue flow setting
-
-
-
+ self.assertEqual(q.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L})
+ assert q.flowStopped
# now drain the queue using a session to the other broker
- ssn1 = cluster[1].connect().session()
+ ssn1 = brokers.second().connect().session()
r1 = ssn1.receiver("flq", capacity=6)
- try:
- while r1.fetch(timeout=1):
- ssn1.acknowledge()
- except Empty:
- pass
- sender.join()
-
- # and verify both brokers see an unblocked queue
- q = cluster[0].qmf_session.getObjects(_objectId=oid)[0]
- self.assertFalse(q.flowStopped)
- q = cluster[1].qmf_session.getObjects(_objectId=oid)[0]
- self.assertFalse(q.flowStopped)
+ for x in range(4):
+ r1.fetch(timeout=0)
+ ssn1.acknowledge()
+ sender.wait() # Verify no longer blocked.
ssn0.connection.close()
ssn1.connection.close()
cluster_test_logs.verify_logs()
+ def test_queue_flowlimit(self):
+ """Test flow limits on a standalone broker"""
+ broker = self.broker()
+ class Brokers:
+ def first(self): return broker
+ def second(self): return broker
+ self.queue_flowlimit_test(Brokers())
+
+ def test_queue_flowlimit_cluster(self):
+ return # TODO aconway 2011-02-18: disabled till fixed, QPID-2935
+ cluster = self.cluster(2)
+ class Brokers:
+ def first(self): return cluster[0]
+ def second(self): return cluster[1]
+ self.queue_flowlimit_test(Brokers())
+ def test_queue_flowlimit_cluster_join(self):
+ return # TODO aconway 2011-02-18: disabled till fixed, QPID-2935
+ cluster = self.cluster(1)
+ class Brokers:
+ def first(self): return cluster[0]
+ def second(self):
+ if len(cluster) == 1: cluster.start()
+ return cluster[1]
+ self.queue_flowlimit_test(Brokers())
class LongTests(BrokerTest):
"""Tests that can run for a long time if -DDURATION=<minutes> is set"""