summaryrefslogtreecommitdiff
path: root/cpp/src/tests/queue_flow_limit_tests.py
blob: dec7cfb3af945799ab8c43f9f974326f53a62140 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
#!/usr/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
# 
#   http://www.apache.org/licenses/LICENSE-2.0
# 
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#

import sys
from qpid.testlib import TestBase010
from qpid import datatypes, messaging
from qpid.messaging import Message, Empty
from threading import Thread, Lock
from logging import getLogger
from time import sleep, time
from os import environ, popen

class QueueFlowLimitTests(TestBase010):

    def __getattr__(self, name):
        if name == "assertGreater":
            return lambda a, b: self.failUnless(a > b)
        else:
            raise AttributeError

    def _create_queue(self, name,
                     stop_count=None, resume_count=None,
                     stop_size=None, resume_size=None,
                     max_size=None, max_count=None):
        """ Create a queue with the given flow settings via the queue.declare
        command.
        """
        args={}
        if (stop_count is not None):
            args["qpid.flow_stop_count"] = stop_count;
        if (resume_count is not None):
            args["qpid.flow_resume_count"] = resume_count;
        if (stop_size is not None):
            args["qpid.flow_stop_size"] = stop_size;
        if (resume_size is not None):
            args["qpid.flow_resume_size"] = resume_size;
        if (max_size is not None):
            args["qpid.max_size"] = max_size;
        if (max_count is not None):
            args["qpid.max_count"] = max_count;


        self.session.queue_declare(queue=name, arguments=args)

        qs = self.qmf.getObjects(_class="queue")
        for i in qs:
            if i.name == name:
                # verify flow settings
                if (stop_count is not None):
                    self.assertEqual(i.arguments.get("qpid.flow_stop_count"), stop_count)
                if (resume_count is not None):
                    self.assertEqual(i.arguments.get("qpid.flow_resume_count"), resume_count)
                if (stop_size is not None):
                    self.assertEqual(i.arguments.get("qpid.flow_stop_size"), stop_size)
                if (resume_size is not None):
                    self.assertEqual(i.arguments.get("qpid.flow_resume_size"), resume_size)
                if (max_size is not None):
                    self.assertEqual(i.arguments.get("qpid.max_size"), max_size)
                if (max_count is not None):
                    self.assertEqual(i.arguments.get("qpid.max_count"), max_count)
                self.failIf(i.flowStopped)
                return i.getObjectId()
        self.fail("Unable to create queue '%s'" % name)
        return None


    def _delete_queue(self, name):
        """ Delete a named queue
        """
        self.session.queue_delete(queue=name)


    def _start_qpid_send(self, queue, count, content="X", capacity=100):
        """ Use the qpid-send client to generate traffic to a queue.
        """
        command = "qpid-send" + \
                   " -b" +  " %s:%s" % (self.broker.host, self.broker.port) \
                   + " -a " + str(queue) \
                   + " --messages " +  str(count) \
                   + " --content-string " + str(content) \
                   + " --capacity " + str(capacity)
        return popen(command)

    def _start_qpid_receive(self, queue, count, timeout=5):
        """ Use the qpid-receive client to consume from a queue.
        Note well: prints one line of text to stdout for each consumed msg.
        """
        command = "qpid-receive" + \
                   " -b " +  "%s:%s" % (self.broker.host, self.broker.port) \
                   + " -a " + str(queue) \
                   + " --messages " + str(count) \
                   + " --timeout " + str(timeout) \
                   + " --print-content yes"
        return popen(command)

    def test_qpid_config_cmd(self):
        """ Test the qpid-config command's ability to configure a queue's flow
        control thresholds.
        """
        tool = environ.get("QPID_CONFIG_EXEC")
        if tool:
            command = tool + \
                " --broker-addr=%s:%s " % (self.broker.host, self.broker.port) \
                + "add queue test01 --flow-stop-count=999" \
                + " --flow-resume-count=55 --flow-stop-size=5000000" \
                + " --flow-resume-size=100000"
            cmd = popen(command)
            rc = cmd.close()
            self.assertEqual(rc, None)

            # now verify the settings
            self.startQmf();
            qs = self.qmf.getObjects(_class="queue")
            for i in qs:
                if i.name == "test01":
                    self.assertEqual(i.arguments.get("qpid.flow_stop_count"), 999)
                    self.assertEqual(i.arguments.get("qpid.flow_resume_count"), 55)
                    self.assertEqual(i.arguments.get("qpid.flow_stop_size"), 5000000)
                    self.assertEqual(i.arguments.get("qpid.flow_resume_size"), 100000)
                    self.failIf(i.flowStopped)
                    break;
            self.assertEqual(i.name, "test01")
            self._delete_queue("test01")


    def test_flow_count(self):
        """ Create a queue with count-based flow limit.  Spawn several
        producers which will exceed the limit.  Verify limit exceeded.  Consume
        all messages.  Verify flow control released.
        """
        self.startQmf();
        oid = self._create_queue("test-q", stop_count=373, resume_count=229)
        self.assertEqual(self.qmf.getObjects(_objectId=oid)[0].flowStoppedCount, 0)

        sndr1 = self._start_qpid_send("test-q", count=1213, content="XXX", capacity=50);
        sndr2 = self._start_qpid_send("test-q", count=797, content="Y", capacity=13);
        sndr3 = self._start_qpid_send("test-q", count=331, content="ZZZZZ", capacity=149);
        totalMsgs = 1213 + 797 + 331

        # wait until flow control is active
        deadline = time() + 10
        while (not self.qmf.getObjects(_objectId=oid)[0].flowStopped) and \
                time() < deadline:
            pass
        self.failUnless(self.qmf.getObjects(_objectId=oid)[0].flowStopped)
        depth = self.qmf.getObjects(_objectId=oid)[0].msgDepth
        self.assertGreater(depth, 373)

        # now wait until the enqueues stop happening - ensure that
        # not all msgs have been sent (senders are blocked)
        sleep(1)
        newDepth = self.qmf.getObjects(_objectId=oid)[0].msgDepth
        while depth != newDepth:
            depth = newDepth;
            sleep(1)
            newDepth = self.qmf.getObjects(_objectId=oid)[0].msgDepth
        self.assertGreater(totalMsgs, depth)

        # drain the queue
        rcvr = self._start_qpid_receive("test-q",
                                        count=totalMsgs)
        count = 0;
        x = rcvr.readline()    # prints a line for each received msg
        while x:
            count += 1;
            x = rcvr.readline()

        sndr1.close();
        sndr2.close();
        sndr3.close();
        rcvr.close();

        self.assertEqual(count, totalMsgs)
        self.failIf(self.qmf.getObjects(_objectId=oid)[0].flowStopped)
        self.failUnless(self.qmf.getObjects(_objectId=oid)[0].flowStoppedCount)

        self._delete_queue("test-q")


    def test_flow_size(self):
        """ Create a queue with size-based flow limit.  Spawn several
        producers which will exceed the limit.  Verify limit exceeded.  Consume
        all messages.  Verify flow control released.
        """
        self.startQmf();
        oid = self._create_queue("test-q", stop_size=351133, resume_size=251143)

        sndr1 = self._start_qpid_send("test-q", count=1699, content="X"*439, capacity=53);
        sndr2 = self._start_qpid_send("test-q", count=1129, content="Y"*631, capacity=13);
        sndr3 = self._start_qpid_send("test-q", count=881, content="Z"*823, capacity=149);
        totalMsgs = 1699 + 1129 + 881

        # wait until flow control is active
        deadline = time() + 10
        while (not self.qmf.getObjects(_objectId=oid)[0].flowStopped) and \
                time() < deadline:
            pass
        self.failUnless(self.qmf.getObjects(_objectId=oid)[0].flowStopped)
        self.assertGreater(self.qmf.getObjects(_objectId=oid)[0].byteDepth, 351133)

        # now wait until the enqueues stop happening - ensure that
        # not all msgs have been sent (senders are blocked)
        depth = self.qmf.getObjects(_objectId=oid)[0].msgDepth
        sleep(1)
        newDepth = self.qmf.getObjects(_objectId=oid)[0].msgDepth
        while depth != newDepth:
            depth = newDepth;
            sleep(1)
            newDepth = self.qmf.getObjects(_objectId=oid)[0].msgDepth
        self.assertGreater(totalMsgs, depth)

        # drain the queue
        rcvr = self._start_qpid_receive("test-q",
                                        count=totalMsgs)
        count = 0;
        x = rcvr.readline()    # prints a line for each received msg
        while x:
            count += 1;
            x = rcvr.readline()

        sndr1.close();
        sndr2.close();
        sndr3.close();
        rcvr.close();

        self.assertEqual(count, totalMsgs)
        self.failIf(self.qmf.getObjects(_objectId=oid)[0].flowStopped)

        self._delete_queue("test-q")


    def verify_limit(self, testq):
        """ run a limit check against the testq object
        """

        testq.mgmt = self.qmf.getObjects(_objectId=testq.oid)[0]

        # fill up the queue, waiting until flow control is active
        sndr1 = self._start_qpid_send(testq.mgmt.name, count=testq.sendCount, content=testq.content)
        deadline = time() + 10
        while (not testq.mgmt.flowStopped) and time() < deadline:
            testq.mgmt.update()

        self.failUnless(testq.verifyStopped())

        # now consume enough messages to drop below the flow resume point, and
        # verify flow control is released.
        rcvr = self._start_qpid_receive(testq.mgmt.name, count=testq.consumeCount)
        rcvr.readlines()    # prints a line for each received msg
        rcvr.close();

        # we should now be below the resume threshold
        self.failUnless(testq.verifyResumed())

        self._delete_queue(testq.mgmt.name)
        sndr1.close();


    def test_default_flow_count(self):
        """ Create a queue with count-based size limit, and verify the computed
        thresholds using the broker's default ratios.
        """
        class TestQ:
            def __init__(self, oid):
                # Use the broker-wide default flow thresholds of 80%/70% (see
                # run_queue_flow_limit_tests) to base the thresholds off the
                # queue's max_count configuration parameter
                # max_count == 1000 -> stop == 800, resume == 700
                self.oid = oid
                self.sendCount = 1000
                self.consumeCount = 301 # (send - resume) + 1 to reenable flow
                self.content = "X"
            def verifyStopped(self):
                self.mgmt.update()
                return self.mgmt.flowStopped and (self.mgmt.msgDepth > 800)
            def verifyResumed(self):
                self.mgmt.update()
                return (not self.mgmt.flowStopped) and (self.mgmt.msgDepth < 700)

        self.startQmf();
        oid = self._create_queue("test-X", max_count=1000)
        self.verify_limit(TestQ(oid))


    def test_default_flow_size(self):
        """ Create a queue with byte-based size limit, and verify the computed
        thresholds using the broker's default ratios.
        """
        class TestQ:
            def __init__(self, oid):
                # Use the broker-wide default flow thresholds of 80%/70% (see
                # run_queue_flow_limit_tests) to base the thresholds off the
                # queue's max_count configuration parameter
                # max_size == 10000 -> stop == 8000 bytes, resume == 7000 bytes
                self.oid = oid
                self.sendCount = 2000
                self.consumeCount = 601 # (send - resume) + 1 to reenable flow
                self.content = "XXXXX"  # 5 bytes per message sent.
            def verifyStopped(self):
                self.mgmt.update()
                return self.mgmt.flowStopped and (self.mgmt.byteDepth > 8000)
            def verifyResumed(self):
                self.mgmt.update()
                return (not self.mgmt.flowStopped) and (self.mgmt.byteDepth < 7000)

        self.startQmf();
        oid = self._create_queue("test-Y", max_size=10000)
        self.verify_limit(TestQ(oid))


    def test_blocked_queue_delete(self):
        """ Verify that blocked senders are unblocked when a queue that is flow
        controlled is deleted.
        """

        class BlockedSender(Thread):
            def __init__(self, tester, queue, count, capacity=10):
                self.tester = tester
                self.queue = queue
                self.count = count
                self.capacity = capacity
                Thread.__init__(self)
                self.done = False
                self.start()
            def run(self):
                # spawn qpid-send
                p = self.tester._start_qpid_send(self.queue,
                                                 self.count,
                                                 self.capacity)
                p.close()  # waits for qpid-send to complete
                self.done = True

        self.startQmf();
        oid = self._create_queue("kill-q", stop_size=10, resume_size=2)
        q = self.qmf.getObjects(_objectId=oid)[0]
        self.failIf(q.flowStopped)

        sender = BlockedSender(self, "kill-q", count=100)
        # wait for flow control
        deadline = time() + 10
        while (not q.flowStopped) and time() < deadline:
            q.update()

        self.failUnless(q.flowStopped)
        self.failIf(sender.done)   # sender blocked

        self._delete_queue("kill-q")
        sender.join(5)
        self.failIf(sender.isAlive())
        self.failUnless(sender.done)