#!/usr/bin/env python # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # import sys from qpid.testlib import TestBase010 from qpid.messaging import Connection from threading import Thread from time import sleep, time from os import environ, popen class QueueFlowLimitTests(TestBase010): _timeout = 100 def __getattr__(self, name): if name == "assertGreater": return lambda a, b: self.failUnless(a > b) else: raise AttributeError def _create_queue(self, name, stop_count=None, resume_count=None, stop_size=None, resume_size=None, max_size=None, max_count=None): """ Create a queue with the given flow settings via the queue.declare command. """ args={} if (stop_count is not None): args["qpid.flow_stop_count"] = stop_count; if (resume_count is not None): args["qpid.flow_resume_count"] = resume_count; if (stop_size is not None): args["qpid.flow_stop_size"] = stop_size; if (resume_size is not None): args["qpid.flow_resume_size"] = resume_size; if (max_size is not None): args["qpid.max_size"] = max_size; if (max_count is not None): args["qpid.max_count"] = max_count; broker = self.qmf.getObjects(_class="broker")[0] rc = broker.create( "queue", name, args, True ) self.assertEqual(rc.status, 0, rc) qs = self.qmf.getObjects(_class="queue") for i in qs: if i.name == name: # verify flow settings if (stop_count is not None): self.assertEqual(i.arguments.get("qpid.flow_stop_count"), stop_count) if (resume_count is not None): self.assertEqual(i.arguments.get("qpid.flow_resume_count"), resume_count) if (stop_size is not None): self.assertEqual(i.arguments.get("qpid.flow_stop_size"), stop_size) if (resume_size is not None): self.assertEqual(i.arguments.get("qpid.flow_resume_size"), resume_size) if (max_size is not None): self.assertEqual(i.arguments.get("qpid.max_size"), max_size) if (max_count is not None): self.assertEqual(i.arguments.get("qpid.max_count"), max_count) self.failIf(i.flowStopped) return i.getObjectId() self.fail("Unable to create queue '%s'" % name) return None def _delete_queue(self, name): """ Delete a named queue """ broker = self.qmf.getObjects(_class="broker")[0] rc = broker.delete( "queue", name, {} ) self.assertEqual(rc.status, 0, rc) def _start_qpid_send(self, queue, count, content="X", capacity=100): """ Use the qpid-send client to generate traffic to a queue. """ command = "qpid-send" + \ " -b" + " %s:%s" % (self.broker.host, self.broker.port) \ + " -a " + str(queue) \ + " --messages " + str(count) \ + " --content-string " + str(content) \ + " --capacity " + str(capacity) return popen(command) def _start_qpid_receive(self, queue, count, timeout=5): """ Use the qpid-receive client to consume from a queue. Note well: prints one line of text to stdout for each consumed msg. """ command = "qpid-receive" + \ " -b " + "%s:%s" % (self.broker.host, self.broker.port) \ + " -a " + str(queue) \ + " --messages " + str(count) \ + " --timeout " + str(timeout) \ + " --print-content yes" return popen(command) def test_qpid_config_cmd(self): """ Test the qpid-config command's ability to configure a queue's flow control thresholds. """ tool = environ.get("QPID_CONFIG_EXEC") if tool: command = tool + \ " --broker=%s:%s " % (self.broker.host, self.broker.port) \ + "add queue test01 --flow-stop-count=999" \ + " --flow-resume-count=55 --flow-stop-size=5000000" \ + " --flow-resume-size=100000" cmd = popen(command) rc = cmd.close() self.assertEqual(rc, None) # now verify the settings self.startQmf(); qs = self.qmf.getObjects(_class="queue") for i in qs: if i.name == "test01": self.assertEqual(i.arguments.get("qpid.flow_stop_count"), 999) self.assertEqual(i.arguments.get("qpid.flow_resume_count"), 55) self.assertEqual(i.arguments.get("qpid.flow_stop_size"), 5000000) self.assertEqual(i.arguments.get("qpid.flow_resume_size"), 100000) self.failIf(i.flowStopped) break; self.assertEqual(i.name, "test01") self._delete_queue("test01") def test_flow_count(self): """ Create a queue with count-based flow limit. Spawn several producers which will exceed the limit. Verify limit exceeded. Consume all messages. Verify flow control released. """ self.startQmf(); oid = self._create_queue("test-q", stop_count=373, resume_count=229) self.assertEqual(self.qmf.getObjects(_objectId=oid)[0].flowStoppedCount, 0) sndr1 = self._start_qpid_send("test-q", count=1213, content="XXX", capacity=50); sndr2 = self._start_qpid_send("test-q", count=797, content="Y", capacity=13); sndr3 = self._start_qpid_send("test-q", count=331, content="ZZZZZ", capacity=149); totalMsgs = 1213 + 797 + 331 # wait until flow control is active deadline = time() + self._timeout while (not self.qmf.getObjects(_objectId=oid)[0].flowStopped) and \ time() < deadline: pass self.failUnless(self.qmf.getObjects(_objectId=oid)[0].flowStopped) depth = self.qmf.getObjects(_objectId=oid)[0].msgDepth self.assertGreater(depth, 373) # now wait until the enqueues stop happening - ensure that # not all msgs have been sent (senders are blocked) sleep(1) newDepth = self.qmf.getObjects(_objectId=oid)[0].msgDepth while depth != newDepth: depth = newDepth; sleep(1) newDepth = self.qmf.getObjects(_objectId=oid)[0].msgDepth self.assertGreater(totalMsgs, depth) # drain the queue rcvr = self._start_qpid_receive("test-q", count=totalMsgs) count = 0; x = rcvr.readline() # prints a line for each received msg while x: count += 1; x = rcvr.readline() sndr1.close(); sndr2.close(); sndr3.close(); rcvr.close(); self.assertEqual(count, totalMsgs) self.failIf(self.qmf.getObjects(_objectId=oid)[0].flowStopped) self.failUnless(self.qmf.getObjects(_objectId=oid)[0].flowStoppedCount) self._delete_queue("test-q") def test_flow_size(self): """ Create a queue with size-based flow limit. Spawn several producers which will exceed the limit. Verify limit exceeded. Consume all messages. Verify flow control released. """ self.startQmf(); oid = self._create_queue("test-q", stop_size=351133, resume_size=251143) sndr1 = self._start_qpid_send("test-q", count=1699, content="X"*439, capacity=53); sndr2 = self._start_qpid_send("test-q", count=1129, content="Y"*631, capacity=13); sndr3 = self._start_qpid_send("test-q", count=881, content="Z"*823, capacity=149); totalMsgs = 1699 + 1129 + 881 # wait until flow control is active deadline = time() + self._timeout while (not self.qmf.getObjects(_objectId=oid)[0].flowStopped) and \ time() < deadline: pass self.failUnless(self.qmf.getObjects(_objectId=oid)[0].flowStopped) self.assertGreater(self.qmf.getObjects(_objectId=oid)[0].byteDepth, 351133) # now wait until the enqueues stop happening - ensure that # not all msgs have been sent (senders are blocked) depth = self.qmf.getObjects(_objectId=oid)[0].msgDepth sleep(1) newDepth = self.qmf.getObjects(_objectId=oid)[0].msgDepth while depth != newDepth: depth = newDepth; sleep(1) newDepth = self.qmf.getObjects(_objectId=oid)[0].msgDepth self.assertGreater(totalMsgs, depth) # drain the queue rcvr = self._start_qpid_receive("test-q", count=totalMsgs) count = 0; x = rcvr.readline() # prints a line for each received msg while x: count += 1; x = rcvr.readline() sndr1.close(); sndr2.close(); sndr3.close(); rcvr.close(); self.assertEqual(count, totalMsgs) self.failIf(self.qmf.getObjects(_objectId=oid)[0].flowStopped) self._delete_queue("test-q") def verify_limit(self, testq): """ run a limit check against the testq object """ testq.mgmt = self.qmf.getObjects(_objectId=testq.oid)[0] # fill up the queue, waiting until flow control is active sndr1 = self._start_qpid_send(testq.mgmt.name, count=testq.sendCount, content=testq.content) deadline = time() + self._timeout while (not testq.mgmt.flowStopped) and time() < deadline: testq.mgmt.update() self.failUnless(testq.verifyStopped()) # now consume enough messages to drop below the flow resume point, and # verify flow control is released. rcvr = self._start_qpid_receive(testq.mgmt.name, count=testq.consumeCount) rcvr.readlines() # prints a line for each received msg rcvr.close(); # we should now be below the resume threshold self.failUnless(testq.verifyResumed()) self._delete_queue(testq.mgmt.name) sndr1.close(); def test_default_flow_count(self): """ Create a queue with count-based size limit, and verify the computed thresholds using the broker's default ratios. """ class TestQ: def __init__(self, oid): # Use the broker-wide default flow thresholds of 80%/70% (see # run_queue_flow_limit_tests) to base the thresholds off the # queue's max_count configuration parameter # max_count == 1000 -> stop == 800, resume == 700 self.oid = oid self.sendCount = 1000 self.consumeCount = 301 # (send - resume) + 1 to reenable flow self.content = "X" self.mgmt = None def verifyStopped(self): self.mgmt.update() return self.mgmt.flowStopped and (self.mgmt.msgDepth > 800) def verifyResumed(self): self.mgmt.update() return (not self.mgmt.flowStopped) and (self.mgmt.msgDepth < 700) self.startQmf(); oid = self._create_queue("test-X", max_count=1000) self.verify_limit(TestQ(oid)) def test_default_flow_size(self): """ Create a queue with byte-based size limit, and verify the computed thresholds using the broker's default ratios. """ class TestQ: def __init__(self, oid): # Use the broker-wide default flow thresholds of 80%/70% (see # run_queue_flow_limit_tests) to base the thresholds off the # queue's max_count configuration parameter # max_size == 10000 -> stop == 8000 bytes, resume == 7000 bytes self.oid = oid self.sendCount = 2000 self.consumeCount = 601 # (send - resume) + 1 to reenable flow self.content = "XXXXX" # 5 bytes per message sent. self.mgmt = None def verifyStopped(self): self.mgmt.update() return self.mgmt.flowStopped and (self.mgmt.byteDepth > 8000) def verifyResumed(self): self.mgmt.update() return (not self.mgmt.flowStopped) and (self.mgmt.byteDepth < 7000) self.startQmf(); oid = self._create_queue("test-Y", max_size=10000) self.verify_limit(TestQ(oid)) def test_blocked_queue_delete(self): """ Verify that blocked senders are unblocked when a queue that is flow controlled is deleted. """ class BlockedSender(Thread): def __init__(self, tester, queue, count, capacity=10): self.tester = tester self.queue = queue self.count = count self.capacity = capacity Thread.__init__(self) self.done = False self.start() def run(self): # spawn qpid-send p = self.tester._start_qpid_send(self.queue, self.count, self.capacity) p.close() # waits for qpid-send to complete self.done = True self.startQmf(); oid = self._create_queue("kill-q", stop_size=10, resume_size=2) q = self.qmf.getObjects(_objectId=oid)[0] self.failIf(q.flowStopped) sender = BlockedSender(self, "kill-q", count=100) # wait for flow control deadline = time() + self._timeout while (not q.flowStopped) and time() < deadline: q.update() self.failUnless(q.flowStopped) self.failIf(sender.done) # sender blocked self._delete_queue("kill-q") sender.join(5) self.failIf(sender.isAlive()) self.failUnless(sender.done)