#!/usr/bin/env python # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # import sys from qpid.testlib import TestBase010 from qpid import datatypes, messaging from qpid.messaging import Message, Empty from threading import Thread, Lock from logging import getLogger from time import sleep, time from os import environ, popen class QueueFlowLimitTests(TestBase010): def __getattr__(self, name): if name == "assertGreater": return lambda a, b: self.failUnless(a > b) else: raise AttributeError def _create_queue(self, name, stop_count=None, resume_count=None, stop_size=None, resume_size=None, max_size=None, max_count=None): """ Create a queue with the given flow settings via the queue.declare command. """ args={} if (stop_count is not None): args["qpid.flow_stop_count"] = stop_count; if (resume_count is not None): args["qpid.flow_resume_count"] = resume_count; if (stop_size is not None): args["qpid.flow_stop_size"] = stop_size; if (resume_size is not None): args["qpid.flow_resume_size"] = resume_size; if (max_size is not None): args["qpid.max_size"] = max_size; if (max_count is not None): args["qpid.max_count"] = max_count; self.session.queue_declare(queue=name, arguments=args) qs = self.qmf.getObjects(_class="queue") for i in qs: if i.name == name: # verify flow settings if (stop_count is not None): self.assertEqual(i.arguments.get("qpid.flow_stop_count"), stop_count) if (resume_count is not None): self.assertEqual(i.arguments.get("qpid.flow_resume_count"), resume_count) if (stop_size is not None): self.assertEqual(i.arguments.get("qpid.flow_stop_size"), stop_size) if (resume_size is not None): self.assertEqual(i.arguments.get("qpid.flow_resume_size"), resume_size) if (max_size is not None): self.assertEqual(i.arguments.get("qpid.max_size"), max_size) if (max_count is not None): self.assertEqual(i.arguments.get("qpid.max_count"), max_count) self.failIf(i.flowStopped) return i.getObjectId() self.fail("Unable to create queue '%s'" % name) return None def _delete_queue(self, name): """ Delete a named queue """ self.session.queue_delete(queue=name) def _start_qpid_send(self, queue, count, content="X", capacity=100): """ Use the qpid-send client to generate traffic to a queue. """ command = "qpid-send" + \ " -b" + " %s:%s" % (self.broker.host, self.broker.port) \ + " -a " + str(queue) \ + " --messages " + str(count) \ + " --content-string " + str(content) \ + " --capacity " + str(capacity) return popen(command) def _start_qpid_receive(self, queue, count, timeout=5): """ Use the qpid-receive client to consume from a queue. Note well: prints one line of text to stdout for each consumed msg. """ command = "qpid-receive" + \ " -b " + "%s:%s" % (self.broker.host, self.broker.port) \ + " -a " + str(queue) \ + " --messages " + str(count) \ + " --timeout " + str(timeout) \ + " --print-content yes" return popen(command) def test_qpid_config_cmd(self): """ Test the qpid-config command's ability to configure a queue's flow control thresholds. """ tool = environ.get("QPID_CONFIG_EXEC") if tool: command = tool + \ " --broker-addr=%s:%s " % (self.broker.host, self.broker.port) \ + "add queue test01 --flow-stop-count=999" \ + " --flow-resume-count=55 --flow-stop-size=5000000" \ + " --flow-resume-size=100000" cmd = popen(command) rc = cmd.close() self.assertEqual(rc, None) # now verify the settings self.startQmf(); qs = self.qmf.getObjects(_class="queue") for i in qs: if i.name == "test01": self.assertEqual(i.arguments.get("qpid.flow_stop_count"), 999) self.assertEqual(i.arguments.get("qpid.flow_resume_count"), 55) self.assertEqual(i.arguments.get("qpid.flow_stop_size"), 5000000) self.assertEqual(i.arguments.get("qpid.flow_resume_size"), 100000) self.failIf(i.flowStopped) break; self.assertEqual(i.name, "test01") self._delete_queue("test01") def test_flow_count(self): """ Create a queue with count-based flow limit. Spawn several producers which will exceed the limit. Verify limit exceeded. Consume all messages. Verify flow control released. """ self.startQmf(); oid = self._create_queue("test-q", stop_count=373, resume_count=229) self.assertEqual(self.qmf.getObjects(_objectId=oid)[0].flowStoppedCount, 0) sndr1 = self._start_qpid_send("test-q", count=1213, content="XXX", capacity=50); sndr2 = self._start_qpid_send("test-q", count=797, content="Y", capacity=13); sndr3 = self._start_qpid_send("test-q", count=331, content="ZZZZZ", capacity=149); totalMsgs = 1213 + 797 + 331 # wait until flow control is active deadline = time() + 10 while (not self.qmf.getObjects(_objectId=oid)[0].flowStopped) and \ time() < deadline: pass self.failUnless(self.qmf.getObjects(_objectId=oid)[0].flowStopped) depth = self.qmf.getObjects(_objectId=oid)[0].msgDepth self.assertGreater(depth, 373) # now wait until the enqueues stop happening - ensure that # not all msgs have been sent (senders are blocked) sleep(1) newDepth = self.qmf.getObjects(_objectId=oid)[0].msgDepth while depth != newDepth: depth = newDepth; sleep(1) newDepth = self.qmf.getObjects(_objectId=oid)[0].msgDepth self.assertGreater(totalMsgs, depth) # drain the queue rcvr = self._start_qpid_receive("test-q", count=totalMsgs) count = 0; x = rcvr.readline() # prints a line for each received msg while x: count += 1; x = rcvr.readline() sndr1.close(); sndr2.close(); sndr3.close(); rcvr.close(); self.assertEqual(count, totalMsgs) self.failIf(self.qmf.getObjects(_objectId=oid)[0].flowStopped) self.failUnless(self.qmf.getObjects(_objectId=oid)[0].flowStoppedCount) self._delete_queue("test-q") def test_flow_size(self): """ Create a queue with size-based flow limit. Spawn several producers which will exceed the limit. Verify limit exceeded. Consume all messages. Verify flow control released. """ self.startQmf(); oid = self._create_queue("test-q", stop_size=351133, resume_size=251143) sndr1 = self._start_qpid_send("test-q", count=1699, content="X"*439, capacity=53); sndr2 = self._start_qpid_send("test-q", count=1129, content="Y"*631, capacity=13); sndr3 = self._start_qpid_send("test-q", count=881, content="Z"*823, capacity=149); totalMsgs = 1699 + 1129 + 881 # wait until flow control is active deadline = time() + 10 while (not self.qmf.getObjects(_objectId=oid)[0].flowStopped) and \ time() < deadline: pass self.failUnless(self.qmf.getObjects(_objectId=oid)[0].flowStopped) self.assertGreater(self.qmf.getObjects(_objectId=oid)[0].byteDepth, 351133) # now wait until the enqueues stop happening - ensure that # not all msgs have been sent (senders are blocked) depth = self.qmf.getObjects(_objectId=oid)[0].msgDepth sleep(1) newDepth = self.qmf.getObjects(_objectId=oid)[0].msgDepth while depth != newDepth: depth = newDepth; sleep(1) newDepth = self.qmf.getObjects(_objectId=oid)[0].msgDepth self.assertGreater(totalMsgs, depth) # drain the queue rcvr = self._start_qpid_receive("test-q", count=totalMsgs) count = 0; x = rcvr.readline() # prints a line for each received msg while x: count += 1; x = rcvr.readline() sndr1.close(); sndr2.close(); sndr3.close(); rcvr.close(); self.assertEqual(count, totalMsgs) self.failIf(self.qmf.getObjects(_objectId=oid)[0].flowStopped) self._delete_queue("test-q") def verify_limit(self, testq): """ run a limit check against the testq object """ testq.mgmt = self.qmf.getObjects(_objectId=testq.oid)[0] # fill up the queue, waiting until flow control is active sndr1 = self._start_qpid_send(testq.mgmt.name, count=testq.sendCount, content=testq.content) deadline = time() + 10 while (not testq.mgmt.flowStopped) and time() < deadline: testq.mgmt.update() self.failUnless(testq.verifyStopped()) # now consume enough messages to drop below the flow resume point, and # verify flow control is released. rcvr = self._start_qpid_receive(testq.mgmt.name, count=testq.consumeCount) rcvr.readlines() # prints a line for each received msg rcvr.close(); # we should now be below the resume threshold self.failUnless(testq.verifyResumed()) self._delete_queue(testq.mgmt.name) sndr1.close(); def test_default_flow_count(self): """ Create a queue with count-based size limit, and verify the computed thresholds using the broker's default ratios. """ class TestQ: def __init__(self, oid): # Use the broker-wide default flow thresholds of 80%/70% (see # run_queue_flow_limit_tests) to base the thresholds off the # queue's max_count configuration parameter # max_count == 1000 -> stop == 800, resume == 700 self.oid = oid self.sendCount = 1000 self.consumeCount = 301 # (send - resume) + 1 to reenable flow self.content = "X" def verifyStopped(self): self.mgmt.update() return self.mgmt.flowStopped and (self.mgmt.msgDepth > 800) def verifyResumed(self): self.mgmt.update() return (not self.mgmt.flowStopped) and (self.mgmt.msgDepth < 700) self.startQmf(); oid = self._create_queue("test-X", max_count=1000) self.verify_limit(TestQ(oid)) def test_default_flow_size(self): """ Create a queue with byte-based size limit, and verify the computed thresholds using the broker's default ratios. """ class TestQ: def __init__(self, oid): # Use the broker-wide default flow thresholds of 80%/70% (see # run_queue_flow_limit_tests) to base the thresholds off the # queue's max_count configuration parameter # max_size == 10000 -> stop == 8000 bytes, resume == 7000 bytes self.oid = oid self.sendCount = 2000 self.consumeCount = 601 # (send - resume) + 1 to reenable flow self.content = "XXXXX" # 5 bytes per message sent. def verifyStopped(self): self.mgmt.update() return self.mgmt.flowStopped and (self.mgmt.byteDepth > 8000) def verifyResumed(self): self.mgmt.update() return (not self.mgmt.flowStopped) and (self.mgmt.byteDepth < 7000) self.startQmf(); oid = self._create_queue("test-Y", max_size=10000) self.verify_limit(TestQ(oid)) def test_blocked_queue_delete(self): """ Verify that blocked senders are unblocked when a queue that is flow controlled is deleted. """ class BlockedSender(Thread): def __init__(self, tester, queue, count, capacity=10): self.tester = tester self.queue = queue self.count = count self.capacity = capacity Thread.__init__(self) self.done = False self.start() def run(self): # spawn qpid-send p = self.tester._start_qpid_send(self.queue, self.count, self.capacity) p.close() # waits for qpid-send to complete self.done = True self.startQmf(); oid = self._create_queue("kill-q", stop_size=10, resume_size=2) q = self.qmf.getObjects(_objectId=oid)[0] self.failIf(q.flowStopped) sender = BlockedSender(self, "kill-q", count=100) # wait for flow control deadline = time() + 10 while (not q.flowStopped) and time() < deadline: q.update() self.failUnless(q.flowStopped) self.failIf(sender.done) # sender blocked self._delete_queue("kill-q") sender.join(5) self.failIf(sender.isAlive()) self.failUnless(sender.done)