summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/queue_flow_limit_tests.py
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/tests/queue_flow_limit_tests.py')
-rw-r--r--qpid/cpp/src/tests/queue_flow_limit_tests.py159
1 files changed, 15 insertions, 144 deletions
diff --git a/qpid/cpp/src/tests/queue_flow_limit_tests.py b/qpid/cpp/src/tests/queue_flow_limit_tests.py
index dec7cfb3af..ac62dcdd1e 100644
--- a/qpid/cpp/src/tests/queue_flow_limit_tests.py
+++ b/qpid/cpp/src/tests/queue_flow_limit_tests.py
@@ -24,7 +24,7 @@ from qpid import datatypes, messaging
from qpid.messaging import Message, Empty
from threading import Thread, Lock
from logging import getLogger
-from time import sleep, time
+from time import sleep
from os import environ, popen
class QueueFlowLimitTests(TestBase010):
@@ -37,8 +37,7 @@ class QueueFlowLimitTests(TestBase010):
def _create_queue(self, name,
stop_count=None, resume_count=None,
- stop_size=None, resume_size=None,
- max_size=None, max_count=None):
+ stop_size=None, resume_size=None):
""" Create a queue with the given flow settings via the queue.declare
command.
"""
@@ -51,11 +50,6 @@ class QueueFlowLimitTests(TestBase010):
args["qpid.flow_stop_size"] = stop_size;
if (resume_size is not None):
args["qpid.flow_resume_size"] = resume_size;
- if (max_size is not None):
- args["qpid.max_size"] = max_size;
- if (max_count is not None):
- args["qpid.max_count"] = max_count;
-
self.session.queue_declare(queue=name, arguments=args)
@@ -71,10 +65,6 @@ class QueueFlowLimitTests(TestBase010):
self.assertEqual(i.arguments.get("qpid.flow_stop_size"), stop_size)
if (resume_size is not None):
self.assertEqual(i.arguments.get("qpid.flow_resume_size"), resume_size)
- if (max_size is not None):
- self.assertEqual(i.arguments.get("qpid.max_size"), max_size)
- if (max_count is not None):
- self.assertEqual(i.arguments.get("qpid.max_count"), max_count)
self.failIf(i.flowStopped)
return i.getObjectId()
self.fail("Unable to create queue '%s'" % name)
@@ -87,7 +77,7 @@ class QueueFlowLimitTests(TestBase010):
self.session.queue_delete(queue=name)
- def _start_qpid_send(self, queue, count, content="X", capacity=100):
+ def _start_qpid_send(self, queue, count, content="X", capacity=10):
""" Use the qpid-send client to generate traffic to a queue.
"""
command = "qpid-send" + \
@@ -147,18 +137,19 @@ class QueueFlowLimitTests(TestBase010):
"""
self.startQmf();
oid = self._create_queue("test-q", stop_count=373, resume_count=229)
- self.assertEqual(self.qmf.getObjects(_objectId=oid)[0].flowStoppedCount, 0)
sndr1 = self._start_qpid_send("test-q", count=1213, content="XXX", capacity=50);
sndr2 = self._start_qpid_send("test-q", count=797, content="Y", capacity=13);
sndr3 = self._start_qpid_send("test-q", count=331, content="ZZZZZ", capacity=149);
totalMsgs = 1213 + 797 + 331
+
# wait until flow control is active
- deadline = time() + 10
- while (not self.qmf.getObjects(_objectId=oid)[0].flowStopped) and \
- time() < deadline:
- pass
+ count = 0
+ while self.qmf.getObjects(_objectId=oid)[0].flowStopped == False and \
+ count < 10:
+ sleep(1);
+ count += 1;
self.failUnless(self.qmf.getObjects(_objectId=oid)[0].flowStopped)
depth = self.qmf.getObjects(_objectId=oid)[0].msgDepth
self.assertGreater(depth, 373)
@@ -189,7 +180,6 @@ class QueueFlowLimitTests(TestBase010):
self.assertEqual(count, totalMsgs)
self.failIf(self.qmf.getObjects(_objectId=oid)[0].flowStopped)
- self.failUnless(self.qmf.getObjects(_objectId=oid)[0].flowStoppedCount)
self._delete_queue("test-q")
@@ -206,12 +196,14 @@ class QueueFlowLimitTests(TestBase010):
sndr2 = self._start_qpid_send("test-q", count=1129, content="Y"*631, capacity=13);
sndr3 = self._start_qpid_send("test-q", count=881, content="Z"*823, capacity=149);
totalMsgs = 1699 + 1129 + 881
+ totalBytes = 439 + 631 + 823
# wait until flow control is active
- deadline = time() + 10
- while (not self.qmf.getObjects(_objectId=oid)[0].flowStopped) and \
- time() < deadline:
- pass
+ count = 0
+ while self.qmf.getObjects(_objectId=oid)[0].flowStopped == False and \
+ count < 10:
+ sleep(1);
+ count += 1;
self.failUnless(self.qmf.getObjects(_objectId=oid)[0].flowStopped)
self.assertGreater(self.qmf.getObjects(_objectId=oid)[0].byteDepth, 351133)
@@ -246,126 +238,5 @@ class QueueFlowLimitTests(TestBase010):
self._delete_queue("test-q")
- def verify_limit(self, testq):
- """ run a limit check against the testq object
- """
-
- testq.mgmt = self.qmf.getObjects(_objectId=testq.oid)[0]
-
- # fill up the queue, waiting until flow control is active
- sndr1 = self._start_qpid_send(testq.mgmt.name, count=testq.sendCount, content=testq.content)
- deadline = time() + 10
- while (not testq.mgmt.flowStopped) and time() < deadline:
- testq.mgmt.update()
-
- self.failUnless(testq.verifyStopped())
-
- # now consume enough messages to drop below the flow resume point, and
- # verify flow control is released.
- rcvr = self._start_qpid_receive(testq.mgmt.name, count=testq.consumeCount)
- rcvr.readlines() # prints a line for each received msg
- rcvr.close();
-
- # we should now be below the resume threshold
- self.failUnless(testq.verifyResumed())
-
- self._delete_queue(testq.mgmt.name)
- sndr1.close();
-
-
- def test_default_flow_count(self):
- """ Create a queue with count-based size limit, and verify the computed
- thresholds using the broker's default ratios.
- """
- class TestQ:
- def __init__(self, oid):
- # Use the broker-wide default flow thresholds of 80%/70% (see
- # run_queue_flow_limit_tests) to base the thresholds off the
- # queue's max_count configuration parameter
- # max_count == 1000 -> stop == 800, resume == 700
- self.oid = oid
- self.sendCount = 1000
- self.consumeCount = 301 # (send - resume) + 1 to reenable flow
- self.content = "X"
- def verifyStopped(self):
- self.mgmt.update()
- return self.mgmt.flowStopped and (self.mgmt.msgDepth > 800)
- def verifyResumed(self):
- self.mgmt.update()
- return (not self.mgmt.flowStopped) and (self.mgmt.msgDepth < 700)
-
- self.startQmf();
- oid = self._create_queue("test-X", max_count=1000)
- self.verify_limit(TestQ(oid))
-
-
- def test_default_flow_size(self):
- """ Create a queue with byte-based size limit, and verify the computed
- thresholds using the broker's default ratios.
- """
- class TestQ:
- def __init__(self, oid):
- # Use the broker-wide default flow thresholds of 80%/70% (see
- # run_queue_flow_limit_tests) to base the thresholds off the
- # queue's max_count configuration parameter
- # max_size == 10000 -> stop == 8000 bytes, resume == 7000 bytes
- self.oid = oid
- self.sendCount = 2000
- self.consumeCount = 601 # (send - resume) + 1 to reenable flow
- self.content = "XXXXX" # 5 bytes per message sent.
- def verifyStopped(self):
- self.mgmt.update()
- return self.mgmt.flowStopped and (self.mgmt.byteDepth > 8000)
- def verifyResumed(self):
- self.mgmt.update()
- return (not self.mgmt.flowStopped) and (self.mgmt.byteDepth < 7000)
-
- self.startQmf();
- oid = self._create_queue("test-Y", max_size=10000)
- self.verify_limit(TestQ(oid))
-
-
- def test_blocked_queue_delete(self):
- """ Verify that blocked senders are unblocked when a queue that is flow
- controlled is deleted.
- """
-
- class BlockedSender(Thread):
- def __init__(self, tester, queue, count, capacity=10):
- self.tester = tester
- self.queue = queue
- self.count = count
- self.capacity = capacity
- Thread.__init__(self)
- self.done = False
- self.start()
- def run(self):
- # spawn qpid-send
- p = self.tester._start_qpid_send(self.queue,
- self.count,
- self.capacity)
- p.close() # waits for qpid-send to complete
- self.done = True
-
- self.startQmf();
- oid = self._create_queue("kill-q", stop_size=10, resume_size=2)
- q = self.qmf.getObjects(_objectId=oid)[0]
- self.failIf(q.flowStopped)
-
- sender = BlockedSender(self, "kill-q", count=100)
- # wait for flow control
- deadline = time() + 10
- while (not q.flowStopped) and time() < deadline:
- q.update()
-
- self.failUnless(q.flowStopped)
- self.failIf(sender.done) # sender blocked
-
- self._delete_queue("kill-q")
- sender.join(5)
- self.failIf(sender.isAlive())
- self.failUnless(sender.done)
-
-