diff options
author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-02-04 16:56:13 +0000 |
---|---|---|
committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-02-04 16:56:13 +0000 |
commit | c7ed111cf813eda770866d1b62fa4eb1f83c1f7c (patch) | |
tree | 0b94d9104b146912177156216728889a69aaa571 | |
parent | 5f5c8896096529aa2ec9b39397963cd96a70f4e1 (diff) | |
download | qpid-python-c7ed111cf813eda770866d1b62fa4eb1f83c1f7c.tar.gz |
QPID-2935: add simple flow tests
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2935@1067220 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/tests/Makefile.am | 7 | ||||
-rw-r--r-- | qpid/cpp/src/tests/queue_flow_limit_tests.py | 245 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/run_queue_flow_limit_tests | 55 |
3 files changed, 305 insertions, 2 deletions
diff --git a/qpid/cpp/src/tests/Makefile.am b/qpid/cpp/src/tests/Makefile.am index 330b87e277..9a1c9e51f6 100644 --- a/qpid/cpp/src/tests/Makefile.am +++ b/qpid/cpp/src/tests/Makefile.am @@ -310,7 +310,9 @@ TESTS_ENVIRONMENT = \ $(srcdir)/run_test system_tests = qpid-client-test quick_perftest quick_topictest run_header_test quick_txtest -TESTS += start_broker $(system_tests) python_tests stop_broker run_federation_tests run_acl_tests run_cli_tests replication_test dynamic_log_level_test +TESTS += start_broker $(system_tests) python_tests stop_broker run_federation_tests \ + run_acl_tests run_cli_tests replication_test dynamic_log_level_test \ + run_queue_flow_limit_tests EXTRA_DIST += \ run_test vg_check \ @@ -349,7 +351,8 @@ EXTRA_DIST += \ run_test.ps1 \ start_broker.ps1 \ stop_broker.ps1 \ - topictest.ps1 + topictest.ps1 \ + run_queue_flow_limit_tests check_LTLIBRARIES += libdlclose_noop.la libdlclose_noop_la_LDFLAGS = -module -rpath $(abs_builddir) diff --git a/qpid/cpp/src/tests/queue_flow_limit_tests.py b/qpid/cpp/src/tests/queue_flow_limit_tests.py new file mode 100644 index 0000000000..9bfba28e15 --- /dev/null +++ b/qpid/cpp/src/tests/queue_flow_limit_tests.py @@ -0,0 +1,245 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import sys +from qpid.testlib import TestBase010 +from qpid import datatypes, messaging +from qpid.messaging import Message, Empty +from threading import Thread, Lock +from logging import getLogger +from time import sleep +from subprocess import Popen, PIPE +from os import environ + +class QueueFlowLimitTests(TestBase010): + + def _create_queue(self, name, + stop_count=None, resume_count=None, + stop_size=None, resume_size=None): + """ Create a queue with the given flow settings via the queue.declare + command. + """ + args={} + if (stop_count is not None): + args["qpid.flow_stop_count"] = stop_count; + if (resume_count is not None): + args["qpid.flow_resume_count"] = resume_count; + if (stop_size is not None): + args["qpid.flow_stop_size"] = stop_size; + if (resume_size is not None): + args["qpid.flow_resume_size"] = resume_size; + + self.session.queue_declare(queue=name, arguments=args) + + qs = self.qmf.getObjects(_class="queue") + for i in qs: + if i.name == name: + # verify flow settings + if (stop_count is not None): + self.assertEqual(i.flowStopCount, stop_count) + if (resume_count is not None): + self.assertEqual(i.flowResumeCount, resume_count) + if (stop_size is not None): + self.assertEqual(i.flowStopSize, stop_size) + if (resume_size is not None): + self.assertEqual(i.flowResumeSize, resume_size) + self.assertFalse(i.flowStopped) + return i.getObjectId() + self.fail("Unable to create queue '%s'" % name) + return None + + + def _delete_queue(self, name): + """ Delete a named queue + """ + self.session.queue_delete(queue=name) + + + def _start_qpid_send(self, queue, count, content="X", capacity=10): + """ Use the qpid-send client to generate traffic to a queue. + """ + command = ["qpid-send", + "-b", "%s:%s" % (self.broker.host, self.broker.port), + "-a", str(queue), + "--messages", str(count), + "--content-string", str(content), + "--capacity", str(capacity) + ] + + return Popen(command, stdout=PIPE) + + def _start_qpid_receive(self, queue, count, timeout=5): + """ Use the qpid-receive client to consume from a queue. + Note well: prints one line of text to stdout for each consumed msg. + """ + command = ["qpid-receive", + "-b", "%s:%s" % (self.broker.host, self.broker.port), + "-a", str(queue), + "--messages", str(count), + "--timeout", str(timeout), + "--print-content", "yes" + ] + return Popen(command, stdout=PIPE) + + + + def test_qpid_config_cmd(self): + """ Test the qpid-config command's ability to configure a queue's flow + control thresholds. + """ + tool = environ.get("QPID_CONFIG_EXEC") + if tool: + command = [tool, + "--broker-addr=%s:%s" % (self.broker.host, self.broker.port), + "add", "queue", "test01", + "--flow-stop-count=999", + "--flow-resume-count=55", + "--flow-stop-size=5000000", + "--flow-resume-size=100000"] + #cmd = Popen(command, stdout=PIPE) + cmd = Popen(command) + cmd.wait() + self.assertEqual(cmd.returncode, 0) + + # now verify the settings + self.startQmf(); + qs = self.qmf.getObjects(_class="queue") + for i in qs: + if i.name == "test01": + self.assertEqual(i.flowStopCount, 999) + self.assertEqual(i.flowResumeCount, 55) + self.assertEqual(i.flowStopSize, 5000000) + self.assertEqual(i.flowResumeSize, 100000) + self.assertFalse(i.flowStopped) + break; + self.assertEqual(i.name, "test01") + self._delete_queue("test01") + + + def test_flow_count(self): + """ Create a queue with count-based flow limit. Spawn several + producers which will exceed the limit. Verify limit exceeded. Consume + all messages. Verify flow control released. + """ + self.startQmf(); + oid = self._create_queue("test-q", stop_count=373, resume_count=229) + + sndr1 = self._start_qpid_send("test-q", count=1213, content="XXX", capacity=50); + sndr2 = self._start_qpid_send("test-q", count=797, content="Y", capacity=13); + sndr3 = self._start_qpid_send("test-q", count=331, content="ZZZZZ", capacity=149); + totalMsgs = 1213 + 797 + 331 + + + # wait until flow control is active + count = 0 + while self.qmf.getObjects(_objectId=oid)[0].flowStopped == False and \ + count < 10: + sleep(1); + count += 1; + self.assertTrue(self.qmf.getObjects(_objectId=oid)[0].flowStopped) + depth = self.qmf.getObjects(_objectId=oid)[0].msgDepth + self.assertGreater(depth, 373) + + # now wait until the enqueues stop happening - ensure that + # not all msgs have been sent (senders are blocked) + sleep(1) + newDepth = self.qmf.getObjects(_objectId=oid)[0].msgDepth + while depth != newDepth: + depth = newDepth; + sleep(1) + newDepth = self.qmf.getObjects(_objectId=oid)[0].msgDepth + self.assertGreater(totalMsgs, depth) + + # drain the queue + rcvr = self._start_qpid_receive("test-q", + count=totalMsgs) + count = 0; + x = rcvr.stdout.readline() # prints a line for each received msg + while x: + count += 1; + x = rcvr.stdout.readline() + + sndr1.wait(); + sndr2.wait(); + sndr3.wait(); + rcvr.wait(); + + self.assertEqual(count, totalMsgs) + self.assertFalse(self.qmf.getObjects(_objectId=oid)[0].flowStopped) + + self._delete_queue("test-q") + + + def test_flow_size(self): + """ Create a queue with size-based flow limit. Spawn several + producers which will exceed the limit. Verify limit exceeded. Consume + all messages. Verify flow control released. + """ + self.startQmf(); + oid = self._create_queue("test-q", stop_size=351133, resume_size=251143) + + sndr1 = self._start_qpid_send("test-q", count=1699, content="X"*439, capacity=53); + sndr2 = self._start_qpid_send("test-q", count=1129, content="Y"*631, capacity=13); + sndr3 = self._start_qpid_send("test-q", count=881, content="Z"*823, capacity=149); + totalMsgs = 1699 + 1129 + 881 + totalBytes = 439 + 631 + 823 + + # wait until flow control is active + count = 0 + while self.qmf.getObjects(_objectId=oid)[0].flowStopped == False and \ + count < 10: + sleep(1); + count += 1; + self.assertTrue(self.qmf.getObjects(_objectId=oid)[0].flowStopped) + self.assertGreater(self.qmf.getObjects(_objectId=oid)[0].byteDepth, 351133) + + # now wait until the enqueues stop happening - ensure that + # not all msgs have been sent (senders are blocked) + depth = self.qmf.getObjects(_objectId=oid)[0].msgDepth + sleep(1) + newDepth = self.qmf.getObjects(_objectId=oid)[0].msgDepth + while depth != newDepth: + depth = newDepth; + sleep(1) + newDepth = self.qmf.getObjects(_objectId=oid)[0].msgDepth + self.assertGreater(totalMsgs, depth) + + # drain the queue + rcvr = self._start_qpid_receive("test-q", + count=totalMsgs) + count = 0; + x = rcvr.stdout.readline() # prints a line for each received msg + while x: + count += 1; + x = rcvr.stdout.readline() + + sndr1.wait(); + sndr2.wait(); + sndr3.wait(); + rcvr.wait(); + + self.assertEqual(count, totalMsgs) + self.assertFalse(self.qmf.getObjects(_objectId=oid)[0].flowStopped) + + self._delete_queue("test-q") + + + + diff --git a/qpid/cpp/src/tests/run_queue_flow_limit_tests b/qpid/cpp/src/tests/run_queue_flow_limit_tests new file mode 100755 index 0000000000..9f2f093353 --- /dev/null +++ b/qpid/cpp/src/tests/run_queue_flow_limit_tests @@ -0,0 +1,55 @@ +#!/bin/sh + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Run tests against Queue producer flow control. + +source ./test_env.sh +test -d $PYTHON_DIR || { echo "Skipping queue flow control tests, no python dir."; exit 0; } + +LOG_FILE=queue_flow_limit_test.log +PORT="" + +trap stop_broker INT TERM QUIT + +error() { + echo $* + exit 1; +} + +start_broker() { + rm -rf $LOG_FILE + PORT=$($QPIDD_EXEC --auth=no --no-module-dir --daemon --port=0 -t --log-to-file $LOG_FILE) || error "Could not start broker" +} + +stop_broker() { + test -n "$PORT" && $QPIDD_EXEC --no-module-dir --quit --port $PORT +} + +start_broker +echo "Running Queue flow limit tests using broker on port $PORT" +$QPID_PYTHON_TEST -m queue_flow_limit_tests $SKIPTESTS -b localhost:$PORT $@ +RETCODE=$? +stop_broker +if test x$RETCODE != x0; then + echo "FAIL queue flow limit tests"; exit 1; +fi +rm -rf $LOG_FILE + |