diff options
-rw-r--r-- | qpid/cpp/src/tests/queue_flow_limit_tests.py | 119 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/run_queue_flow_limit_tests | 4 |
2 files changed, 96 insertions, 27 deletions
diff --git a/qpid/cpp/src/tests/queue_flow_limit_tests.py b/qpid/cpp/src/tests/queue_flow_limit_tests.py index 51f91647fb..d761bde40d 100644 --- a/qpid/cpp/src/tests/queue_flow_limit_tests.py +++ b/qpid/cpp/src/tests/queue_flow_limit_tests.py @@ -37,7 +37,8 @@ class QueueFlowLimitTests(TestBase010): def _create_queue(self, name, stop_count=None, resume_count=None, - stop_size=None, resume_size=None): + stop_size=None, resume_size=None, + max_size=None, max_count=None): """ Create a queue with the given flow settings via the queue.declare command. """ @@ -50,6 +51,11 @@ class QueueFlowLimitTests(TestBase010): args["qpid.flow_stop_size"] = stop_size; if (resume_size is not None): args["qpid.flow_resume_size"] = resume_size; + if (max_size is not None): + args["qpid.max_size"] = max_size; + if (max_count is not None): + args["qpid.max_count"] = max_count; + self.session.queue_declare(queue=name, arguments=args) @@ -65,6 +71,10 @@ class QueueFlowLimitTests(TestBase010): self.assertEqual(i.arguments.get("qpid.flow_stop_size"), stop_size) if (resume_size is not None): self.assertEqual(i.arguments.get("qpid.flow_resume_size"), resume_size) + if (max_size is not None): + self.assertEqual(i.arguments.get("qpid.max_size"), max_size) + if (max_count is not None): + self.assertEqual(i.arguments.get("qpid.max_count"), max_count) self.failIf(i.flowStopped) return i.getObjectId() self.fail("Unable to create queue '%s'" % name) @@ -77,7 +87,7 @@ class QueueFlowLimitTests(TestBase010): self.session.queue_delete(queue=name) - def _start_qpid_send(self, queue, count, content="X", capacity=10): + def _start_qpid_send(self, queue, count, content="X", capacity=100): """ Use the qpid-send client to generate traffic to a queue. """ command = "qpid-send" + \ @@ -129,27 +139,6 @@ class QueueFlowLimitTests(TestBase010): self.assertEqual(i.name, "test01") self._delete_queue("test01") - # now verify that the default ratios are applied if max sizing is specified: - command = tool + \ - " --broker-addr=%s:%s " % (self.broker.host, self.broker.port) \ - + "add queue test02 --max-queue-count=10000 --max-queue-size=1000000" - cmd = popen(command) - rc = cmd.close() - self.assertEqual(rc, None) - - # now verify the settings - qs = self.qmf.getObjects(_class="queue") - for i in qs: - if i.name == "test02": - ## @todo KAG: can't get the flow size from qmf! Arrgh! - # no way to verify... - #self.assertEqual(i.arguments.get("qpid.flow_resume_count"), 55) - #self.assertEqual(i.arguments.get("qpid.flow_resume_count"), 55) - self.failIf(i.flowStopped) - break; - self.assertEqual(i.name, "test02") - self._delete_queue("test02") - def test_flow_count(self): """ Create a queue with count-based flow limit. Spawn several @@ -167,7 +156,7 @@ class QueueFlowLimitTests(TestBase010): # wait until flow control is active deadline = time() + 10 - while self.qmf.getObjects(_objectId=oid)[0].flowStopped == False and \ + while (not self.qmf.getObjects(_objectId=oid)[0].flowStopped) and \ time() < deadline: pass self.failUnless(self.qmf.getObjects(_objectId=oid)[0].flowStopped) @@ -217,11 +206,10 @@ class QueueFlowLimitTests(TestBase010): sndr2 = self._start_qpid_send("test-q", count=1129, content="Y"*631, capacity=13); sndr3 = self._start_qpid_send("test-q", count=881, content="Z"*823, capacity=149); totalMsgs = 1699 + 1129 + 881 - totalBytes = 439 + 631 + 823 # wait until flow control is active deadline = time() + 10 - while self.qmf.getObjects(_objectId=oid)[0].flowStopped == False and \ + while (not self.qmf.getObjects(_objectId=oid)[0].flowStopped) and \ time() < deadline: pass self.failUnless(self.qmf.getObjects(_objectId=oid)[0].flowStopped) @@ -258,5 +246,84 @@ class QueueFlowLimitTests(TestBase010): self._delete_queue("test-q") + def verify_limit(self, testq): + """ run a limit check against the testq object + """ + + testq.mgmt = self.qmf.getObjects(_objectId=testq.oid)[0] + + # fill up the queue, waiting until flow control is active + sndr1 = self._start_qpid_send(testq.mgmt.name, count=testq.sendCount, content=testq.content) + deadline = time() + 10 + while (not testq.mgmt.flowStopped) and time() < deadline: + testq.mgmt.update() + + self.failUnless(testq.verifyStopped()) + + # now consume enough messages to drop below the flow resume point, and + # verify flow control is released. + rcvr = self._start_qpid_receive(testq.mgmt.name, count=testq.consumeCount) + rcvr.readlines() # prints a line for each received msg + rcvr.close(); + + # we should now be below the resume threshold + self.failUnless(testq.verifyResumed()) + + self._delete_queue(testq.mgmt.name) + sndr1.close(); + + + def test_default_flow_count(self): + """ Create a queue with count-based size limit, and verify the computed + thresholds using the broker's default ratios. + """ + class TestQ: + def __init__(self, oid): + # Use the broker-wide default flow thresholds of 80%/70% (see + # run_queue_flow_limit_tests) to base the thresholds off the + # queue's max_count configuration parameter + # max_count == 1000 -> stop == 800, resume == 700 + self.oid = oid + self.sendCount = 1000 + self.consumeCount = 301 # (send - resume) + 1 to reenable flow + self.content = "X" + def verifyStopped(self): + self.mgmt.update() + return self.mgmt.flowStopped and (self.mgmt.msgDepth > 800) + def verifyResumed(self): + self.mgmt.update() + return (not self.mgmt.flowStopped) and (self.mgmt.msgDepth < 700) + + self.startQmf(); + oid = self._create_queue("test-X", max_count=1000) + self.verify_limit(TestQ(oid)) + + + def test_default_flow_size(self): + """ Create a queue with byte-based size limit, and verify the computed + thresholds using the broker's default ratios. + """ + class TestQ: + def __init__(self, oid): + # Use the broker-wide default flow thresholds of 80%/70% (see + # run_queue_flow_limit_tests) to base the thresholds off the + # queue's max_count configuration parameter + # max_size == 10000 -> stop == 8000 bytes, resume == 7000 bytes + self.oid = oid + self.sendCount = 2000 + self.consumeCount = 601 # (send - resume) + 1 to reenable flow + self.content = "XXXXX" # 5 bytes per message sent. + def verifyStopped(self): + self.mgmt.update() + return self.mgmt.flowStopped and (self.mgmt.byteDepth > 8000) + def verifyResumed(self): + self.mgmt.update() + return (not self.mgmt.flowStopped) and (self.mgmt.byteDepth < 7000) + + self.startQmf(); + oid = self._create_queue("test-Y", max_size=10000) + self.verify_limit(TestQ(oid)) + + diff --git a/qpid/cpp/src/tests/run_queue_flow_limit_tests b/qpid/cpp/src/tests/run_queue_flow_limit_tests index 9f2f093353..f921cf5e7e 100755 --- a/qpid/cpp/src/tests/run_queue_flow_limit_tests +++ b/qpid/cpp/src/tests/run_queue_flow_limit_tests @@ -35,8 +35,10 @@ error() { } start_broker() { + # Note: if you change the DEFAULT_THRESHOLDS, you will need to update queue_flow_limit_tests.py + DEFAULT_THRESHOLDS="--default-flow-stop-threshold=80 --default-flow-resume-threshold=70" rm -rf $LOG_FILE - PORT=$($QPIDD_EXEC --auth=no --no-module-dir --daemon --port=0 -t --log-to-file $LOG_FILE) || error "Could not start broker" + PORT=$($QPIDD_EXEC $DEFAULT_THRESHOLDS --auth=no --no-module-dir --daemon --port=0 -t --log-to-file $LOG_FILE) || error "Could not start broker" } stop_broker() { |