summaryrefslogtreecommitdiff
path: root/cpp/src/tests/cluster_tests.py
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/tests/cluster_tests.py')
-rwxr-xr-xcpp/src/tests/cluster_tests.py109
1 files changed, 108 insertions, 1 deletions
diff --git a/cpp/src/tests/cluster_tests.py b/cpp/src/tests/cluster_tests.py
index cbad4010b4..3e13a3ce8a 100755
--- a/cpp/src/tests/cluster_tests.py
+++ b/cpp/src/tests/cluster_tests.py
@@ -23,7 +23,7 @@ from qpid import datatypes, messaging
from brokertest import *
from qpid.harness import Skipped
from qpid.messaging import Message, Empty
-from threading import Thread, Lock
+from threading import Thread, Lock, Condition
from logging import getLogger
from itertools import chain
from tempfile import NamedTemporaryFile
@@ -304,6 +304,113 @@ acl allow all all
# Verify logs are consistent
cluster_test_logs.verify_logs()
+ class BlockedSend(Thread):
+ """Send a message, send is expected to block.
+ Verify that it does block (for a given timeout), then allow
+ waiting till it unblocks when it is expected to do so."""
+ def __init__(self, sender, msg):
+ self.sender, self.msg = sender, msg
+ self.blocked = True
+ self.condition = Condition()
+ self.timeout = 0.1 # Time to wait for expected results.
+ Thread.__init__(self)
+ def run(self):
+ try:
+ self.sender.send(self.msg)
+ self.condition.acquire()
+ try:
+ self.blocked = False
+ self.condition.notify()
+ finally: self.condition.release()
+ except Exception,e: print "BlockedSend exception: %s"%e
+ def start(self):
+ Thread.start(self)
+ time.sleep(self.timeout)
+ assert self.blocked # Expected to block
+ def assert_blocked(self): assert self.blocked
+ def wait(self): # Now expecting to unblock
+ self.condition.acquire()
+ try:
+ while self.blocked:
+ self.condition.wait(self.timeout)
+ if self.blocked: raise Exception("Timed out waiting for send to unblock")
+ finally: self.condition.release()
+ self.join()
+
+ def queue_flowlimit_test(self, brokers):
+ """Verify that the queue's flowlimit configuration and state are
+ correctly replicated.
+ The brokers argument allows this test to run on single broker,
+ cluster of 2 pre-startd brokers or cluster where second broker
+ starts after queue is in flow control.
+ """
+ # configure a queue with a specific flow limit on first broker
+ ssn0 = brokers.first().connect().session()
+ s0 = ssn0.sender("flq; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.flow_stop_count':5, 'qpid.flow_resume_count':3}}}}")
+ brokers.first().startQmf()
+ q = [q for q in brokers.first().qmf_session.getObjects(_class="queue") if q.name == "flq"][0]
+ oid = q.getObjectId()
+ self.assertEqual(q.name, "flq")
+ self.assertEqual(q.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L})
+ assert not q.flowStopped
+
+ # fill the queue on one broker until flow control is active
+ for x in range(5): s0.send(Message(str(x)))
+ sender = ShortTests.BlockedSend(s0, Message(str(6)))
+ sender.start() # Tests that sender does block
+ # Verify the broker queue goes into a flowStopped state
+ deadline = time.time() + 1
+ while not q.flowStopped and time.time() < deadline: q.update()
+ assert q.flowStopped
+ sender.assert_blocked() # Still blocked
+
+ # Now verify the both brokers in cluster have same configuration
+ brokers.second().startQmf()
+ qs = brokers.second().qmf_session.getObjects(_objectId=oid)
+ self.assertEqual(len(qs), 1)
+ q = qs[0]
+ self.assertEqual(q.name, "flq")
+ self.assertEqual(q.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L})
+ assert q.flowStopped
+
+ # now drain the queue using a session to the other broker
+ ssn1 = brokers.second().connect().session()
+ r1 = ssn1.receiver("flq", capacity=6)
+ for x in range(4):
+ r1.fetch(timeout=0)
+ ssn1.acknowledge()
+ sender.wait() # Verify no longer blocked.
+
+ ssn0.connection.close()
+ ssn1.connection.close()
+ cluster_test_logs.verify_logs()
+
+ def test_queue_flowlimit(self):
+ """Test flow limits on a standalone broker"""
+ broker = self.broker()
+ class Brokers:
+ def first(self): return broker
+ def second(self): return broker
+ self.queue_flowlimit_test(Brokers())
+
+ def test_queue_flowlimit_cluster(self):
+ return # TODO aconway 2011-02-18: disabled till fixed, QPID-2935
+ cluster = self.cluster(2)
+ class Brokers:
+ def first(self): return cluster[0]
+ def second(self): return cluster[1]
+ self.queue_flowlimit_test(Brokers())
+
+ def test_queue_flowlimit_cluster_join(self):
+ return # TODO aconway 2011-02-18: disabled till fixed, QPID-2935
+ cluster = self.cluster(1)
+ class Brokers:
+ def first(self): return cluster[0]
+ def second(self):
+ if len(cluster) == 1: cluster.start()
+ return cluster[1]
+ self.queue_flowlimit_test(Brokers())
+
class LongTests(BrokerTest):
"""Tests that can run for a long time if -DDURATION=<minutes> is set"""
def duration(self):