diff options
Diffstat (limited to 'qpid/tests/src/py/qpid_tests/broker_0_10/priority.py')
-rw-r--r-- | qpid/tests/src/py/qpid_tests/broker_0_10/priority.py | 243 |
1 files changed, 243 insertions, 0 deletions
diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/priority.py b/qpid/tests/src/py/qpid_tests/broker_0_10/priority.py new file mode 100644 index 0000000000..6a60add97e --- /dev/null +++ b/qpid/tests/src/py/qpid_tests/broker_0_10/priority.py @@ -0,0 +1,243 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from qpid.messaging import * +from qpid.tests.messaging import Base +from qpid.compat import set +import math + +class PriorityTests (Base): + """ + Test prioritised messaging + """ + + def setup_connection(self): + return Connection.establish(self.broker, **self.connection_options()) + + def setup_session(self): + return self.conn.session() + + def prioritised_delivery(self, priorities, levels=10, key="x-qpid-priorities"): + """ + Test that message on a queue are delivered in priority order. + """ + msgs = [Message(content=str(uuid4()), priority = p) for p in priorities] + + snd = self.ssn.sender("priority-queue; {create: sender, delete: receiver, node: {x-declare:{arguments:{'%s':%s}}}}" % (key, levels), + durable=self.durable()) + for m in msgs: snd.send(m) + + rcv = self.ssn.receiver(snd.target) + for expected in sorted_(msgs, key=lambda m: priority_level(m.priority,levels), reverse=True): + msg = rcv.fetch(0) + #print "expected priority %s got %s" % (expected.priority, msg.priority) + assert msg.content == expected.content + self.ssn.acknowledge(msg) + + def fairshare_delivery(self, priorities, default_limit=5, limits=None, levels=10, level_key="x-qpid-priorities", fairshare_key="x-qpid-fairshare"): + msgs = [Message(content=str(uuid4()), priority = p) for p in priorities] + + limit_policy = "'%s':%s" % (fairshare_key, default_limit) + if limits: + for k, v in limits.items(): + limit_policy += ", '%s-%s':%s" % (fairshare_key, k, v) + + snd = self.ssn.sender("priority-queue; {create: sender, delete: receiver, node: {x-declare:{arguments:{'%s':%s, %s}}}}" + % (level_key, levels, limit_policy), + durable=self.durable()) + for m in msgs: snd.send(m) + + rcv = self.ssn.receiver(snd.target) + if limits: + limit_function = lambda x : limits.get(x, 0) + else: + limit_function = lambda x : default_limit + for expected in fairshare(sorted_(msgs, key=lambda m: priority_level(m.priority,levels), reverse=True), + limit_function, levels): + msg = rcv.fetch(0) + #print "expected priority %s got %s" % (expected.priority, msg.priority) + assert msg.priority == expected.priority + assert msg.content == expected.content + self.ssn.acknowledge(msg) + + def test_prioritised_delivery_1(self): + self.prioritised_delivery(priorities = [8,9,5,1,2,2,3,4,15,7,8,10,10,2], levels = 10) + + def test_prioritised_delivery_with_alias(self): + self.prioritised_delivery(priorities = [8,9,5,1,2,2,3,4,15,7,8,10,10,2], levels = 10, key="qpid.priorities") + + def test_prioritised_delivery_2(self): + self.prioritised_delivery(priorities = [8,9,5,1,2,2,3,4,15,7,8,10,10,2], levels = 5) + + def test_fairshare_1(self): + self.fairshare_delivery(priorities = [4,5,3,6,10,10,2,10,2,10,10,1,10,10,10,3,3,3,10,10,3,10,3,10,10,10,10,10,10,2,3]) + + def test_fairshare_with_alias(self): + self.fairshare_delivery(priorities = [4,5,3,6,10,10,2,10,2,10,10,1,10,10,10,3,3,3,10,10,2,3], level_key="qpid.priorities", fairshare_key="qpid.fairshare") + + def test_fairshare_2(self): + self.fairshare_delivery(priorities = [10 for i in range(30)]) + + def test_fairshare_3(self): + self.fairshare_delivery(priorities = [4,5,3,7,8,8,2,8,2,8,8,16,6,6,6,6,6,6,8,3,5,8,3,5,5,3,3,8,8,3,7,3,7,7,7,8,8,8,2,3], limits={7:0,6:4,5:3,4:2,3:2,2:2,1:2}, levels=8) + + def test_browsing(self): + priorities = [4,5,3,6,10,10,2,10,2,10,10,1,10,10,10,3,3,3,10,10,3,10,3,10,10,10,10,10,10,2,3] + msgs = [Message(content=str(uuid4()), priority = p) for p in priorities] + snd = self.ssn.sender("priority-queue; {create: sender, node: {x-declare:{arguments:{x-qpid-priorities:10}}}}", + durable=self.durable()) + for m in msgs: snd.send(m) + + rcv = self.ssn.receiver("priority-queue; {mode: browse, delete: receiver}") + received = [] + try: + while True: received.append(rcv.fetch(0)) + except Empty: None + #check all messages on the queue were received by the browser; don't relay on any specific ordering at present + assert set([m.content for m in msgs]) == set([m.content for m in received]) + + def ring_queue_check(self, msgs): + """ + Ensure that a ring queue removes lowest priority messages first. + """ + snd = self.ssn.sender(address("priority-ring-queue", arguments="x-qpid-priorities:10, 'qpid.policy_type':ring, 'qpid.max_count':10"), + durable=self.durable()) + for m in msgs: snd.send(m) + + rcv = self.ssn.receiver(snd.target) + received = [] + try: + while True: received.append(rcv.fetch(0)) + except Empty: None + + expected = [] + for m in msgs: + while len(expected) > 9: + expected=sorted_(expected, key=lambda x: priority_level(x.priority,10)) + expected.pop(0) + expected.append(m) + #print "sent %s; expected %s; got %s" % ([m.content for m in msgs], [m.content for m in expected], [m.content for m in received]) + assert [m.content for m in expected] == [m.content for m in received] + + def test_ring_queue_1(self): + priorities = [4,5,3,6,9,9,2,9,2,9,9,1,9,9,9,3,3,3,9,9,3,9,3,9,9,9,9,9,9,2,3] + seq = content("msg") + self.ring_queue_check([Message(content=seq.next(), priority = p) for p in priorities]) + + def test_ring_queue_2(self): + priorities = [9,0,2,3,6,9,9,2,9,2,9,9,1,9,4,7,1,1,3,9,9,3,9,3,9,9,9,1,9,9,2,3,0,9] + seq = content("msg") + self.ring_queue_check([Message(content=seq.next(), priority = p) for p in priorities]) + + def test_requeue(self): + priorities = [4,5,3,6,10,10,2,10,2,10,10,1,10,10,10,3,3,3,10,10,3,10,3,10,10,10,10,10,10,2,3] + msgs = [Message(content=str(uuid4()), priority = p) for p in priorities] + + snd = self.ssn.sender("priority-queue; {create: sender, delete: receiver, node: {x-declare:{arguments:{x-qpid-priorities:10}}}}", + durable=self.durable()) + #want to have some messages requeued so enable prefetch on a dummy receiver + other = self.conn.session() + dummy = other.receiver("priority-queue") + dummy.capacity = 10 + + for m in msgs: snd.send(m) + + #fetch some with dummy receiver on which prefetch is also enabled + for i in range(5): + msg = dummy.fetch(0) + #close session without acknowledgements to requeue messages + other.close() + + #now test delivery works as expected after that + rcv = self.ssn.receiver(snd.target) + for expected in sorted_(msgs, key=lambda m: priority_level(m.priority,10), reverse=True): + msg = rcv.fetch(0) + #print "expected priority %s got %s" % (expected.priority, msg.priority) + assert msg.content == expected.content + self.ssn.acknowledge(msg) + +def content(base, counter=1): + while True: + yield "%s-%s" % (base, counter) + counter += 1 + +def address(name, create_policy="sender", delete_policy="receiver", arguments=None): + if arguments: node = "node: {x-declare:{arguments:{%s}}}" % arguments + else: node = "node: {}" + return "%s; {create: %s, delete: %s, %s}" % (name, create_policy, delete_policy, node) + +def fairshare(msgs, limit, levels): + """ + Generator to return prioritised messages in expected order for a given fairshare limit + """ + count = 0 + last_priority = None + postponed = [] + while msgs or postponed: + if not msgs: + msgs = postponed + count = 0 + last_priority = None + postponed = [] + msg = msgs.pop(0) + if last_priority and priority_level(msg.priority, levels) == last_priority: + count += 1 + else: + last_priority = priority_level(msg.priority, levels) + count = 1 + l = limit(last_priority) + if (l and count > l): + postponed.append(msg) + else: + yield msg + return + +def effective_priority(value, levels): + """ + Method to determine effective priority given a distinct number of + levels supported. Returns the lowest priority value that is of + equivalent priority to the value passed in. + """ + if value <= 5-math.ceil(levels/2.0): return 0 + if value >= 4+math.floor(levels/2.0): return 4+math.floor(levels/2.0) + return value + +def priority_level(value, levels): + """ + Method to determine which of a distinct number of priority levels + a given value falls into. + """ + offset = 5-math.ceil(levels/2.0) + return min(max(value - offset, 0), levels-1) + +def sorted_(msgs, key=None, reverse=False): + """ + Workaround lack of sorted builtin function in python 2.3 and lack + of keyword arguments to list.sort() + """ + temp = msgs + temp.sort(key_to_cmp(key, reverse=reverse)) + return temp + +def key_to_cmp(key, reverse=False): + if key: + if reverse: return lambda a, b: cmp(key(b), key(a)) + else: return lambda a, b: cmp(key(a), key(b)) + else: + return None |