summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2011-05-05 20:26:09 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2011-05-05 20:26:09 +0000
commit9025c7bb4ec9a4968ac46659a185cf2d320db065 (patch)
treeec5bef92213ab92198ebd21c293abe463c0bd360
parent43fcc9d7f365439b7be269dd8f1307211eafd967 (diff)
downloadqpid-python-9025c7bb4ec9a4968ac46659a185cf2d320db065.tar.gz
QPID-3243: unit test to verify fix.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1099957 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/tests/queue_flow_limit_tests.py119
-rwxr-xr-xqpid/cpp/src/tests/run_queue_flow_limit_tests4
2 files changed, 96 insertions, 27 deletions
diff --git a/qpid/cpp/src/tests/queue_flow_limit_tests.py b/qpid/cpp/src/tests/queue_flow_limit_tests.py
index 51f91647fb..d761bde40d 100644
--- a/qpid/cpp/src/tests/queue_flow_limit_tests.py
+++ b/qpid/cpp/src/tests/queue_flow_limit_tests.py
@@ -37,7 +37,8 @@ class QueueFlowLimitTests(TestBase010):
def _create_queue(self, name,
stop_count=None, resume_count=None,
- stop_size=None, resume_size=None):
+ stop_size=None, resume_size=None,
+ max_size=None, max_count=None):
""" Create a queue with the given flow settings via the queue.declare
command.
"""
@@ -50,6 +51,11 @@ class QueueFlowLimitTests(TestBase010):
args["qpid.flow_stop_size"] = stop_size;
if (resume_size is not None):
args["qpid.flow_resume_size"] = resume_size;
+ if (max_size is not None):
+ args["qpid.max_size"] = max_size;
+ if (max_count is not None):
+ args["qpid.max_count"] = max_count;
+
self.session.queue_declare(queue=name, arguments=args)
@@ -65,6 +71,10 @@ class QueueFlowLimitTests(TestBase010):
self.assertEqual(i.arguments.get("qpid.flow_stop_size"), stop_size)
if (resume_size is not None):
self.assertEqual(i.arguments.get("qpid.flow_resume_size"), resume_size)
+ if (max_size is not None):
+ self.assertEqual(i.arguments.get("qpid.max_size"), max_size)
+ if (max_count is not None):
+ self.assertEqual(i.arguments.get("qpid.max_count"), max_count)
self.failIf(i.flowStopped)
return i.getObjectId()
self.fail("Unable to create queue '%s'" % name)
@@ -77,7 +87,7 @@ class QueueFlowLimitTests(TestBase010):
self.session.queue_delete(queue=name)
- def _start_qpid_send(self, queue, count, content="X", capacity=10):
+ def _start_qpid_send(self, queue, count, content="X", capacity=100):
""" Use the qpid-send client to generate traffic to a queue.
"""
command = "qpid-send" + \
@@ -129,27 +139,6 @@ class QueueFlowLimitTests(TestBase010):
self.assertEqual(i.name, "test01")
self._delete_queue("test01")
- # now verify that the default ratios are applied if max sizing is specified:
- command = tool + \
- " --broker-addr=%s:%s " % (self.broker.host, self.broker.port) \
- + "add queue test02 --max-queue-count=10000 --max-queue-size=1000000"
- cmd = popen(command)
- rc = cmd.close()
- self.assertEqual(rc, None)
-
- # now verify the settings
- qs = self.qmf.getObjects(_class="queue")
- for i in qs:
- if i.name == "test02":
- ## @todo KAG: can't get the flow size from qmf! Arrgh!
- # no way to verify...
- #self.assertEqual(i.arguments.get("qpid.flow_resume_count"), 55)
- #self.assertEqual(i.arguments.get("qpid.flow_resume_count"), 55)
- self.failIf(i.flowStopped)
- break;
- self.assertEqual(i.name, "test02")
- self._delete_queue("test02")
-
def test_flow_count(self):
""" Create a queue with count-based flow limit. Spawn several
@@ -167,7 +156,7 @@ class QueueFlowLimitTests(TestBase010):
# wait until flow control is active
deadline = time() + 10
- while self.qmf.getObjects(_objectId=oid)[0].flowStopped == False and \
+ while (not self.qmf.getObjects(_objectId=oid)[0].flowStopped) and \
time() < deadline:
pass
self.failUnless(self.qmf.getObjects(_objectId=oid)[0].flowStopped)
@@ -217,11 +206,10 @@ class QueueFlowLimitTests(TestBase010):
sndr2 = self._start_qpid_send("test-q", count=1129, content="Y"*631, capacity=13);
sndr3 = self._start_qpid_send("test-q", count=881, content="Z"*823, capacity=149);
totalMsgs = 1699 + 1129 + 881
- totalBytes = 439 + 631 + 823
# wait until flow control is active
deadline = time() + 10
- while self.qmf.getObjects(_objectId=oid)[0].flowStopped == False and \
+ while (not self.qmf.getObjects(_objectId=oid)[0].flowStopped) and \
time() < deadline:
pass
self.failUnless(self.qmf.getObjects(_objectId=oid)[0].flowStopped)
@@ -258,5 +246,84 @@ class QueueFlowLimitTests(TestBase010):
self._delete_queue("test-q")
+ def verify_limit(self, testq):
+ """ run a limit check against the testq object
+ """
+
+ testq.mgmt = self.qmf.getObjects(_objectId=testq.oid)[0]
+
+ # fill up the queue, waiting until flow control is active
+ sndr1 = self._start_qpid_send(testq.mgmt.name, count=testq.sendCount, content=testq.content)
+ deadline = time() + 10
+ while (not testq.mgmt.flowStopped) and time() < deadline:
+ testq.mgmt.update()
+
+ self.failUnless(testq.verifyStopped())
+
+ # now consume enough messages to drop below the flow resume point, and
+ # verify flow control is released.
+ rcvr = self._start_qpid_receive(testq.mgmt.name, count=testq.consumeCount)
+ rcvr.readlines() # prints a line for each received msg
+ rcvr.close();
+
+ # we should now be below the resume threshold
+ self.failUnless(testq.verifyResumed())
+
+ self._delete_queue(testq.mgmt.name)
+ sndr1.close();
+
+
+ def test_default_flow_count(self):
+ """ Create a queue with count-based size limit, and verify the computed
+ thresholds using the broker's default ratios.
+ """
+ class TestQ:
+ def __init__(self, oid):
+ # Use the broker-wide default flow thresholds of 80%/70% (see
+ # run_queue_flow_limit_tests) to base the thresholds off the
+ # queue's max_count configuration parameter
+ # max_count == 1000 -> stop == 800, resume == 700
+ self.oid = oid
+ self.sendCount = 1000
+ self.consumeCount = 301 # (send - resume) + 1 to reenable flow
+ self.content = "X"
+ def verifyStopped(self):
+ self.mgmt.update()
+ return self.mgmt.flowStopped and (self.mgmt.msgDepth > 800)
+ def verifyResumed(self):
+ self.mgmt.update()
+ return (not self.mgmt.flowStopped) and (self.mgmt.msgDepth < 700)
+
+ self.startQmf();
+ oid = self._create_queue("test-X", max_count=1000)
+ self.verify_limit(TestQ(oid))
+
+
+ def test_default_flow_size(self):
+ """ Create a queue with byte-based size limit, and verify the computed
+ thresholds using the broker's default ratios.
+ """
+ class TestQ:
+ def __init__(self, oid):
+ # Use the broker-wide default flow thresholds of 80%/70% (see
+ # run_queue_flow_limit_tests) to base the thresholds off the
+ # queue's max_count configuration parameter
+ # max_size == 10000 -> stop == 8000 bytes, resume == 7000 bytes
+ self.oid = oid
+ self.sendCount = 2000
+ self.consumeCount = 601 # (send - resume) + 1 to reenable flow
+ self.content = "XXXXX" # 5 bytes per message sent.
+ def verifyStopped(self):
+ self.mgmt.update()
+ return self.mgmt.flowStopped and (self.mgmt.byteDepth > 8000)
+ def verifyResumed(self):
+ self.mgmt.update()
+ return (not self.mgmt.flowStopped) and (self.mgmt.byteDepth < 7000)
+
+ self.startQmf();
+ oid = self._create_queue("test-Y", max_size=10000)
+ self.verify_limit(TestQ(oid))
+
+
diff --git a/qpid/cpp/src/tests/run_queue_flow_limit_tests b/qpid/cpp/src/tests/run_queue_flow_limit_tests
index 9f2f093353..f921cf5e7e 100755
--- a/qpid/cpp/src/tests/run_queue_flow_limit_tests
+++ b/qpid/cpp/src/tests/run_queue_flow_limit_tests
@@ -35,8 +35,10 @@ error() {
}
start_broker() {
+ # Note: if you change the DEFAULT_THRESHOLDS, you will need to update queue_flow_limit_tests.py
+ DEFAULT_THRESHOLDS="--default-flow-stop-threshold=80 --default-flow-resume-threshold=70"
rm -rf $LOG_FILE
- PORT=$($QPIDD_EXEC --auth=no --no-module-dir --daemon --port=0 -t --log-to-file $LOG_FILE) || error "Could not start broker"
+ PORT=$($QPIDD_EXEC $DEFAULT_THRESHOLDS --auth=no --no-module-dir --daemon --port=0 -t --log-to-file $LOG_FILE) || error "Could not start broker"
}
stop_broker() {