summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2011-02-04 16:56:13 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2011-02-04 16:56:13 +0000
commitc7ed111cf813eda770866d1b62fa4eb1f83c1f7c (patch)
tree0b94d9104b146912177156216728889a69aaa571
parent5f5c8896096529aa2ec9b39397963cd96a70f4e1 (diff)
downloadqpid-python-c7ed111cf813eda770866d1b62fa4eb1f83c1f7c.tar.gz
QPID-2935: add simple flow tests
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2935@1067220 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/tests/Makefile.am7
-rw-r--r--qpid/cpp/src/tests/queue_flow_limit_tests.py245
-rwxr-xr-xqpid/cpp/src/tests/run_queue_flow_limit_tests55
3 files changed, 305 insertions, 2 deletions
diff --git a/qpid/cpp/src/tests/Makefile.am b/qpid/cpp/src/tests/Makefile.am
index 330b87e277..9a1c9e51f6 100644
--- a/qpid/cpp/src/tests/Makefile.am
+++ b/qpid/cpp/src/tests/Makefile.am
@@ -310,7 +310,9 @@ TESTS_ENVIRONMENT = \
$(srcdir)/run_test
system_tests = qpid-client-test quick_perftest quick_topictest run_header_test quick_txtest
-TESTS += start_broker $(system_tests) python_tests stop_broker run_federation_tests run_acl_tests run_cli_tests replication_test dynamic_log_level_test
+TESTS += start_broker $(system_tests) python_tests stop_broker run_federation_tests \
+ run_acl_tests run_cli_tests replication_test dynamic_log_level_test \
+ run_queue_flow_limit_tests
EXTRA_DIST += \
run_test vg_check \
@@ -349,7 +351,8 @@ EXTRA_DIST += \
run_test.ps1 \
start_broker.ps1 \
stop_broker.ps1 \
- topictest.ps1
+ topictest.ps1 \
+ run_queue_flow_limit_tests
check_LTLIBRARIES += libdlclose_noop.la
libdlclose_noop_la_LDFLAGS = -module -rpath $(abs_builddir)
diff --git a/qpid/cpp/src/tests/queue_flow_limit_tests.py b/qpid/cpp/src/tests/queue_flow_limit_tests.py
new file mode 100644
index 0000000000..9bfba28e15
--- /dev/null
+++ b/qpid/cpp/src/tests/queue_flow_limit_tests.py
@@ -0,0 +1,245 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import sys
+from qpid.testlib import TestBase010
+from qpid import datatypes, messaging
+from qpid.messaging import Message, Empty
+from threading import Thread, Lock
+from logging import getLogger
+from time import sleep
+from subprocess import Popen, PIPE
+from os import environ
+
+class QueueFlowLimitTests(TestBase010):
+
+ def _create_queue(self, name,
+ stop_count=None, resume_count=None,
+ stop_size=None, resume_size=None):
+ """ Create a queue with the given flow settings via the queue.declare
+ command.
+ """
+ args={}
+ if (stop_count is not None):
+ args["qpid.flow_stop_count"] = stop_count;
+ if (resume_count is not None):
+ args["qpid.flow_resume_count"] = resume_count;
+ if (stop_size is not None):
+ args["qpid.flow_stop_size"] = stop_size;
+ if (resume_size is not None):
+ args["qpid.flow_resume_size"] = resume_size;
+
+ self.session.queue_declare(queue=name, arguments=args)
+
+ qs = self.qmf.getObjects(_class="queue")
+ for i in qs:
+ if i.name == name:
+ # verify flow settings
+ if (stop_count is not None):
+ self.assertEqual(i.flowStopCount, stop_count)
+ if (resume_count is not None):
+ self.assertEqual(i.flowResumeCount, resume_count)
+ if (stop_size is not None):
+ self.assertEqual(i.flowStopSize, stop_size)
+ if (resume_size is not None):
+ self.assertEqual(i.flowResumeSize, resume_size)
+ self.assertFalse(i.flowStopped)
+ return i.getObjectId()
+ self.fail("Unable to create queue '%s'" % name)
+ return None
+
+
+ def _delete_queue(self, name):
+ """ Delete a named queue
+ """
+ self.session.queue_delete(queue=name)
+
+
+ def _start_qpid_send(self, queue, count, content="X", capacity=10):
+ """ Use the qpid-send client to generate traffic to a queue.
+ """
+ command = ["qpid-send",
+ "-b", "%s:%s" % (self.broker.host, self.broker.port),
+ "-a", str(queue),
+ "--messages", str(count),
+ "--content-string", str(content),
+ "--capacity", str(capacity)
+ ]
+
+ return Popen(command, stdout=PIPE)
+
+ def _start_qpid_receive(self, queue, count, timeout=5):
+ """ Use the qpid-receive client to consume from a queue.
+ Note well: prints one line of text to stdout for each consumed msg.
+ """
+ command = ["qpid-receive",
+ "-b", "%s:%s" % (self.broker.host, self.broker.port),
+ "-a", str(queue),
+ "--messages", str(count),
+ "--timeout", str(timeout),
+ "--print-content", "yes"
+ ]
+ return Popen(command, stdout=PIPE)
+
+
+
+ def test_qpid_config_cmd(self):
+ """ Test the qpid-config command's ability to configure a queue's flow
+ control thresholds.
+ """
+ tool = environ.get("QPID_CONFIG_EXEC")
+ if tool:
+ command = [tool,
+ "--broker-addr=%s:%s" % (self.broker.host, self.broker.port),
+ "add", "queue", "test01",
+ "--flow-stop-count=999",
+ "--flow-resume-count=55",
+ "--flow-stop-size=5000000",
+ "--flow-resume-size=100000"]
+ #cmd = Popen(command, stdout=PIPE)
+ cmd = Popen(command)
+ cmd.wait()
+ self.assertEqual(cmd.returncode, 0)
+
+ # now verify the settings
+ self.startQmf();
+ qs = self.qmf.getObjects(_class="queue")
+ for i in qs:
+ if i.name == "test01":
+ self.assertEqual(i.flowStopCount, 999)
+ self.assertEqual(i.flowResumeCount, 55)
+ self.assertEqual(i.flowStopSize, 5000000)
+ self.assertEqual(i.flowResumeSize, 100000)
+ self.assertFalse(i.flowStopped)
+ break;
+ self.assertEqual(i.name, "test01")
+ self._delete_queue("test01")
+
+
+ def test_flow_count(self):
+ """ Create a queue with count-based flow limit. Spawn several
+ producers which will exceed the limit. Verify limit exceeded. Consume
+ all messages. Verify flow control released.
+ """
+ self.startQmf();
+ oid = self._create_queue("test-q", stop_count=373, resume_count=229)
+
+ sndr1 = self._start_qpid_send("test-q", count=1213, content="XXX", capacity=50);
+ sndr2 = self._start_qpid_send("test-q", count=797, content="Y", capacity=13);
+ sndr3 = self._start_qpid_send("test-q", count=331, content="ZZZZZ", capacity=149);
+ totalMsgs = 1213 + 797 + 331
+
+
+ # wait until flow control is active
+ count = 0
+ while self.qmf.getObjects(_objectId=oid)[0].flowStopped == False and \
+ count < 10:
+ sleep(1);
+ count += 1;
+ self.assertTrue(self.qmf.getObjects(_objectId=oid)[0].flowStopped)
+ depth = self.qmf.getObjects(_objectId=oid)[0].msgDepth
+ self.assertGreater(depth, 373)
+
+ # now wait until the enqueues stop happening - ensure that
+ # not all msgs have been sent (senders are blocked)
+ sleep(1)
+ newDepth = self.qmf.getObjects(_objectId=oid)[0].msgDepth
+ while depth != newDepth:
+ depth = newDepth;
+ sleep(1)
+ newDepth = self.qmf.getObjects(_objectId=oid)[0].msgDepth
+ self.assertGreater(totalMsgs, depth)
+
+ # drain the queue
+ rcvr = self._start_qpid_receive("test-q",
+ count=totalMsgs)
+ count = 0;
+ x = rcvr.stdout.readline() # prints a line for each received msg
+ while x:
+ count += 1;
+ x = rcvr.stdout.readline()
+
+ sndr1.wait();
+ sndr2.wait();
+ sndr3.wait();
+ rcvr.wait();
+
+ self.assertEqual(count, totalMsgs)
+ self.assertFalse(self.qmf.getObjects(_objectId=oid)[0].flowStopped)
+
+ self._delete_queue("test-q")
+
+
+ def test_flow_size(self):
+ """ Create a queue with size-based flow limit. Spawn several
+ producers which will exceed the limit. Verify limit exceeded. Consume
+ all messages. Verify flow control released.
+ """
+ self.startQmf();
+ oid = self._create_queue("test-q", stop_size=351133, resume_size=251143)
+
+ sndr1 = self._start_qpid_send("test-q", count=1699, content="X"*439, capacity=53);
+ sndr2 = self._start_qpid_send("test-q", count=1129, content="Y"*631, capacity=13);
+ sndr3 = self._start_qpid_send("test-q", count=881, content="Z"*823, capacity=149);
+ totalMsgs = 1699 + 1129 + 881
+ totalBytes = 439 + 631 + 823
+
+ # wait until flow control is active
+ count = 0
+ while self.qmf.getObjects(_objectId=oid)[0].flowStopped == False and \
+ count < 10:
+ sleep(1);
+ count += 1;
+ self.assertTrue(self.qmf.getObjects(_objectId=oid)[0].flowStopped)
+ self.assertGreater(self.qmf.getObjects(_objectId=oid)[0].byteDepth, 351133)
+
+ # now wait until the enqueues stop happening - ensure that
+ # not all msgs have been sent (senders are blocked)
+ depth = self.qmf.getObjects(_objectId=oid)[0].msgDepth
+ sleep(1)
+ newDepth = self.qmf.getObjects(_objectId=oid)[0].msgDepth
+ while depth != newDepth:
+ depth = newDepth;
+ sleep(1)
+ newDepth = self.qmf.getObjects(_objectId=oid)[0].msgDepth
+ self.assertGreater(totalMsgs, depth)
+
+ # drain the queue
+ rcvr = self._start_qpid_receive("test-q",
+ count=totalMsgs)
+ count = 0;
+ x = rcvr.stdout.readline() # prints a line for each received msg
+ while x:
+ count += 1;
+ x = rcvr.stdout.readline()
+
+ sndr1.wait();
+ sndr2.wait();
+ sndr3.wait();
+ rcvr.wait();
+
+ self.assertEqual(count, totalMsgs)
+ self.assertFalse(self.qmf.getObjects(_objectId=oid)[0].flowStopped)
+
+ self._delete_queue("test-q")
+
+
+
+
diff --git a/qpid/cpp/src/tests/run_queue_flow_limit_tests b/qpid/cpp/src/tests/run_queue_flow_limit_tests
new file mode 100755
index 0000000000..9f2f093353
--- /dev/null
+++ b/qpid/cpp/src/tests/run_queue_flow_limit_tests
@@ -0,0 +1,55 @@
+#!/bin/sh
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Run tests against Queue producer flow control.
+
+source ./test_env.sh
+test -d $PYTHON_DIR || { echo "Skipping queue flow control tests, no python dir."; exit 0; }
+
+LOG_FILE=queue_flow_limit_test.log
+PORT=""
+
+trap stop_broker INT TERM QUIT
+
+error() {
+ echo $*
+ exit 1;
+}
+
+start_broker() {
+ rm -rf $LOG_FILE
+ PORT=$($QPIDD_EXEC --auth=no --no-module-dir --daemon --port=0 -t --log-to-file $LOG_FILE) || error "Could not start broker"
+}
+
+stop_broker() {
+ test -n "$PORT" && $QPIDD_EXEC --no-module-dir --quit --port $PORT
+}
+
+start_broker
+echo "Running Queue flow limit tests using broker on port $PORT"
+$QPID_PYTHON_TEST -m queue_flow_limit_tests $SKIPTESTS -b localhost:$PORT $@
+RETCODE=$?
+stop_broker
+if test x$RETCODE != x0; then
+ echo "FAIL queue flow limit tests"; exit 1;
+fi
+rm -rf $LOG_FILE
+