diff options
Diffstat (limited to 'qpid/cpp/src/tests/queue_flow_limit_tests.py')
-rw-r--r-- | qpid/cpp/src/tests/queue_flow_limit_tests.py | 159 |
1 files changed, 15 insertions, 144 deletions
diff --git a/qpid/cpp/src/tests/queue_flow_limit_tests.py b/qpid/cpp/src/tests/queue_flow_limit_tests.py index dec7cfb3af..ac62dcdd1e 100644 --- a/qpid/cpp/src/tests/queue_flow_limit_tests.py +++ b/qpid/cpp/src/tests/queue_flow_limit_tests.py @@ -24,7 +24,7 @@ from qpid import datatypes, messaging from qpid.messaging import Message, Empty from threading import Thread, Lock from logging import getLogger -from time import sleep, time +from time import sleep from os import environ, popen class QueueFlowLimitTests(TestBase010): @@ -37,8 +37,7 @@ class QueueFlowLimitTests(TestBase010): def _create_queue(self, name, stop_count=None, resume_count=None, - stop_size=None, resume_size=None, - max_size=None, max_count=None): + stop_size=None, resume_size=None): """ Create a queue with the given flow settings via the queue.declare command. """ @@ -51,11 +50,6 @@ class QueueFlowLimitTests(TestBase010): args["qpid.flow_stop_size"] = stop_size; if (resume_size is not None): args["qpid.flow_resume_size"] = resume_size; - if (max_size is not None): - args["qpid.max_size"] = max_size; - if (max_count is not None): - args["qpid.max_count"] = max_count; - self.session.queue_declare(queue=name, arguments=args) @@ -71,10 +65,6 @@ class QueueFlowLimitTests(TestBase010): self.assertEqual(i.arguments.get("qpid.flow_stop_size"), stop_size) if (resume_size is not None): self.assertEqual(i.arguments.get("qpid.flow_resume_size"), resume_size) - if (max_size is not None): - self.assertEqual(i.arguments.get("qpid.max_size"), max_size) - if (max_count is not None): - self.assertEqual(i.arguments.get("qpid.max_count"), max_count) self.failIf(i.flowStopped) return i.getObjectId() self.fail("Unable to create queue '%s'" % name) @@ -87,7 +77,7 @@ class QueueFlowLimitTests(TestBase010): self.session.queue_delete(queue=name) - def _start_qpid_send(self, queue, count, content="X", capacity=100): + def _start_qpid_send(self, queue, count, content="X", capacity=10): """ Use the qpid-send client to generate traffic to a queue. """ command = "qpid-send" + \ @@ -147,18 +137,19 @@ class QueueFlowLimitTests(TestBase010): """ self.startQmf(); oid = self._create_queue("test-q", stop_count=373, resume_count=229) - self.assertEqual(self.qmf.getObjects(_objectId=oid)[0].flowStoppedCount, 0) sndr1 = self._start_qpid_send("test-q", count=1213, content="XXX", capacity=50); sndr2 = self._start_qpid_send("test-q", count=797, content="Y", capacity=13); sndr3 = self._start_qpid_send("test-q", count=331, content="ZZZZZ", capacity=149); totalMsgs = 1213 + 797 + 331 + # wait until flow control is active - deadline = time() + 10 - while (not self.qmf.getObjects(_objectId=oid)[0].flowStopped) and \ - time() < deadline: - pass + count = 0 + while self.qmf.getObjects(_objectId=oid)[0].flowStopped == False and \ + count < 10: + sleep(1); + count += 1; self.failUnless(self.qmf.getObjects(_objectId=oid)[0].flowStopped) depth = self.qmf.getObjects(_objectId=oid)[0].msgDepth self.assertGreater(depth, 373) @@ -189,7 +180,6 @@ class QueueFlowLimitTests(TestBase010): self.assertEqual(count, totalMsgs) self.failIf(self.qmf.getObjects(_objectId=oid)[0].flowStopped) - self.failUnless(self.qmf.getObjects(_objectId=oid)[0].flowStoppedCount) self._delete_queue("test-q") @@ -206,12 +196,14 @@ class QueueFlowLimitTests(TestBase010): sndr2 = self._start_qpid_send("test-q", count=1129, content="Y"*631, capacity=13); sndr3 = self._start_qpid_send("test-q", count=881, content="Z"*823, capacity=149); totalMsgs = 1699 + 1129 + 881 + totalBytes = 439 + 631 + 823 # wait until flow control is active - deadline = time() + 10 - while (not self.qmf.getObjects(_objectId=oid)[0].flowStopped) and \ - time() < deadline: - pass + count = 0 + while self.qmf.getObjects(_objectId=oid)[0].flowStopped == False and \ + count < 10: + sleep(1); + count += 1; self.failUnless(self.qmf.getObjects(_objectId=oid)[0].flowStopped) self.assertGreater(self.qmf.getObjects(_objectId=oid)[0].byteDepth, 351133) @@ -246,126 +238,5 @@ class QueueFlowLimitTests(TestBase010): self._delete_queue("test-q") - def verify_limit(self, testq): - """ run a limit check against the testq object - """ - - testq.mgmt = self.qmf.getObjects(_objectId=testq.oid)[0] - - # fill up the queue, waiting until flow control is active - sndr1 = self._start_qpid_send(testq.mgmt.name, count=testq.sendCount, content=testq.content) - deadline = time() + 10 - while (not testq.mgmt.flowStopped) and time() < deadline: - testq.mgmt.update() - - self.failUnless(testq.verifyStopped()) - - # now consume enough messages to drop below the flow resume point, and - # verify flow control is released. - rcvr = self._start_qpid_receive(testq.mgmt.name, count=testq.consumeCount) - rcvr.readlines() # prints a line for each received msg - rcvr.close(); - - # we should now be below the resume threshold - self.failUnless(testq.verifyResumed()) - - self._delete_queue(testq.mgmt.name) - sndr1.close(); - - - def test_default_flow_count(self): - """ Create a queue with count-based size limit, and verify the computed - thresholds using the broker's default ratios. - """ - class TestQ: - def __init__(self, oid): - # Use the broker-wide default flow thresholds of 80%/70% (see - # run_queue_flow_limit_tests) to base the thresholds off the - # queue's max_count configuration parameter - # max_count == 1000 -> stop == 800, resume == 700 - self.oid = oid - self.sendCount = 1000 - self.consumeCount = 301 # (send - resume) + 1 to reenable flow - self.content = "X" - def verifyStopped(self): - self.mgmt.update() - return self.mgmt.flowStopped and (self.mgmt.msgDepth > 800) - def verifyResumed(self): - self.mgmt.update() - return (not self.mgmt.flowStopped) and (self.mgmt.msgDepth < 700) - - self.startQmf(); - oid = self._create_queue("test-X", max_count=1000) - self.verify_limit(TestQ(oid)) - - - def test_default_flow_size(self): - """ Create a queue with byte-based size limit, and verify the computed - thresholds using the broker's default ratios. - """ - class TestQ: - def __init__(self, oid): - # Use the broker-wide default flow thresholds of 80%/70% (see - # run_queue_flow_limit_tests) to base the thresholds off the - # queue's max_count configuration parameter - # max_size == 10000 -> stop == 8000 bytes, resume == 7000 bytes - self.oid = oid - self.sendCount = 2000 - self.consumeCount = 601 # (send - resume) + 1 to reenable flow - self.content = "XXXXX" # 5 bytes per message sent. - def verifyStopped(self): - self.mgmt.update() - return self.mgmt.flowStopped and (self.mgmt.byteDepth > 8000) - def verifyResumed(self): - self.mgmt.update() - return (not self.mgmt.flowStopped) and (self.mgmt.byteDepth < 7000) - - self.startQmf(); - oid = self._create_queue("test-Y", max_size=10000) - self.verify_limit(TestQ(oid)) - - - def test_blocked_queue_delete(self): - """ Verify that blocked senders are unblocked when a queue that is flow - controlled is deleted. - """ - - class BlockedSender(Thread): - def __init__(self, tester, queue, count, capacity=10): - self.tester = tester - self.queue = queue - self.count = count - self.capacity = capacity - Thread.__init__(self) - self.done = False - self.start() - def run(self): - # spawn qpid-send - p = self.tester._start_qpid_send(self.queue, - self.count, - self.capacity) - p.close() # waits for qpid-send to complete - self.done = True - - self.startQmf(); - oid = self._create_queue("kill-q", stop_size=10, resume_size=2) - q = self.qmf.getObjects(_objectId=oid)[0] - self.failIf(q.flowStopped) - - sender = BlockedSender(self, "kill-q", count=100) - # wait for flow control - deadline = time() + 10 - while (not q.flowStopped) and time() < deadline: - q.update() - - self.failUnless(q.flowStopped) - self.failIf(sender.done) # sender blocked - - self._delete_queue("kill-q") - sender.join(5) - self.failIf(sender.isAlive()) - self.failUnless(sender.done) - - |