diff options
author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-10-03 18:39:52 +0000 |
---|---|---|
committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-10-03 18:39:52 +0000 |
commit | 476f673216adabd0eb9cdef6b760c923c6faad2a (patch) | |
tree | 9973a4832b7912cfe621fe5134d1713f52bfb424 | |
parent | 50c30ee93cfdb958a69ac459b45792e68bc3978e (diff) | |
download | qpid-python-476f673216adabd0eb9cdef6b760c923c6faad2a.tar.gz |
QPID-3346: checkpoint some new python tests
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3346@1178511 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py | 372 |
1 files changed, 372 insertions, 0 deletions
diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py b/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py new file mode 100644 index 0000000000..11b2caad08 --- /dev/null +++ b/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py @@ -0,0 +1,372 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from qpid.messaging import * +from qpid.tests.messaging import Base + +from time import sleep +# +# Tests the Broker's support for message groups +# + +class MultiConsumerMsgGroupTests(Base): + """ + Tests for the behavior of multi-consumer message groups. These tests allow + a messages from the same group be consumed by multiple different clients as + long as each message is processed "in sequence". See QPID-3346 for + details. + """ + + def setup_connection(self): + return Connection.establish(self.broker, **self.connection_options()) + + def setup_session(self): + return self.conn.session() + + def test_simple(self): + """ Verify simple acquire/accept actions on a set of grouped + messages shared between two receivers. + """ + ## Create a msg group queue + + snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," + + " node: {x-declare: {arguments:" + + " {'qpid.group_header_key':'THE-GROUP'}}}}") + + groups = ["A","A","A","B","B","B","C","C","C"] + messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups] + index = 0 + for m in messages: + m.content['index'] = index + index += 1 + snd.send(m) + + ## Queue = a-0, a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8... + ## Owners= ---, ---, ---, ---, ---, ---, ---, ---, ---, + + # create consumers on separate sessions: C1,C2 + s1 = self.setup_session() + c1 = s1.receiver("msg-group-q", options={"capacity":1}) + s2 = self.setup_session() + c2 = s2.receiver("msg-group-q", options={"capacity":1}) + + # C1 should acquire A-0, then C2 should acquire B-3 + + m1 = c1.fetch(0); + assert m1.properties['THE-GROUP'] == 'A' + assert m1.content['index'] == 0 + + m2 = c2.fetch(0); + assert m2.properties['THE-GROUP'] == 'B' + assert m2.content['index'] == 3 + + # C1 Acknowledge A-0 + c1.session.acknowledge(m1); + + # C2 should next acquire A-1 + m3 = c2.fetch(0); + assert m3.properties['THE-GROUP'] == 'A' + assert m3.content['index'] == 1 + + # C1 should next acquire C-6, since groups A&B are held by c2 + m4 = c1.fetch(0); + assert m4.properties['THE-GROUP'] == 'C' + assert m4.content['index'] == 6 + + ## Queue = XXX, a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8... + ## Owners= ---, ^C2, +C2, ^C2, +C2, +C2, ^C1, +C1, +C1, + + # C2 Acknowledge B-3, freeing up the rest of B group + c2.session.acknowledge(m2); + + ## Queue = XXX, a-1, a-2, XXX, b-4, b-5, c-6, c-7, c-8... + ## Owners= ---, ^C2, +C2, ---, ---, ---, ^C1, +C1, +C1, + + # C1 should now acquire B-4, since it is next "free" + m5 = c1.fetch(0); + assert m5.properties['THE-GROUP'] == 'B' + assert m5.content['index'] == 4 + + ## Queue = XXX, a-1, a-2, XXX, b-4, b-5, c-6, c-7, c-8... + ## Owners= ---, ^C2, +C2, ---, ^C1, +C1, ^C1, +C1, +C1, + + # C1 acknowledges C-6, freeing the C group + c1.session.acknowledge(m4) + + ## Queue = XXX, a-1, a-2, XXX, b-4, b-5, XXX, c-7, c-8... + ## Owners= ---, ^C2, +C2, ---, ^C1, +C1, ---, ---, --- + + # C2 should next fetch A-2, followed by C-7 + m7 = c2.fetch(0); + assert m7.properties['THE-GROUP'] == 'A' + assert m7.content['index'] == 2 + + m8 = c2.fetch(0); + assert m8.properties['THE-GROUP'] == 'C' + assert m8.content['index'] == 7 + + ## Queue = XXX, a-1, a-2, XXX, b-4, b-5, XXX, c-7, c-8... + ## Owners= ---, ^C2, ^C2, ---, ^C1, +C1, ---, ^C2, +C2 + + # have C2 ack all fetched messages, freeing C-8 + c2.session.acknowledge() + + ## Queue = XXX, XXX, XXX, XXX, b-4, b-5, XXX, XXX, c-8... + ## Owners= ---, ---, ---, ---, ^C1, +C1, ---, ---, --- + + # the next fetch of C2 would get C-8, since B-5 is "owned" + m9 = c2.fetch(0); + assert m9.properties['THE-GROUP'] == 'C' + assert m9.content['index'] == 8 + + ## Queue = XXX, XXX, XXX, XXX, b-4, b-5, XXX, XXX, c-8... + ## Owners= ---, ---, ---, ---, ^C1, +C1, ---, ---, ^C2 + + # C1 acks B-4, freeing B-5 for consumption + c1.session.acknowledge(m5) + + ## Queue = XXX, XXX, XXX, XXX, XXX, b-5, XXX, XXX, c-8... + ## Owners= ---, ---, ---, ---, ---, ^C2, ---, ---, ^C2 + + # the next fetch of C2 would get B-5 + m10 = c2.fetch(0); + assert m10.properties['THE-GROUP'] == 'B' + assert m10.content['index'] == 5 + + # there should be no more left for C1: + try: + mx = c1.fetch(0) + assert False # should never get here + except Empty: + pass + + c1.session.acknowledge() + c2.session.acknowledge() + c1.close() + c2.close() + snd.close() + + def test_simple_browse(self): + """ Test the behavior of a browsing subscription on a message grouping + queue. + """ + + ## Create a msg group queue + + snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," + + " node: {x-declare: {arguments:" + + " {'qpid.group_header_key':'THE-GROUP'}}}}") + + groups = ["A","B","A","B","C"] + messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups] + index = 0 + for m in messages: + m.content['index'] = index + index += 1 + snd.send(m) + + ## Queue = A-0, B-1, A-2, b-3, C-4 + ## Owners= ---, ---, ---, ---, --- + + # create consumer and browser + s1 = self.setup_session() + c1 = s1.receiver("msg-group-q", options={"capacity":1}) + s2 = self.setup_session() + b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":1}) + + # C1 should acquire A-0 + + m1 = c1.fetch(0); + assert m1.properties['THE-GROUP'] == 'A' + assert m1.content['index'] == 0 + + ## Queue = A-0, B-1, A-2, b-3, C-4 + ## Owners= ^C1, ---, +C1, ---, --- + + m2 = b1.fetch(0) + assert m2.properties['THE-GROUP'] == 'B' + assert m2.content['index'] == 1 + + # verify that the browser may see A-2, even though its group is owned + # by C1 + m2 = b1.fetch(0) + assert m2.properties['THE-GROUP'] == 'A' + assert m2.content['index'] == 2 + + m2 = b1.fetch(0) + assert m2.properties['THE-GROUP'] == 'B' + assert m2.content['index'] == 3 + + # verify the consumer can own groups currently seen by the browser + m3 = c1.fetch(0); + assert m3.properties['THE-GROUP'] == 'B' + assert m3.content['index'] == 1 + + m2 = b1.fetch(0) + assert m2.properties['THE-GROUP'] == 'C' + assert m2.content['index'] == 4 + + def test_release(self): + """ Verify releasing a message can free its assocated group + """ + snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," + + " node: {x-declare: {arguments:" + + " {'qpid.group_header_key':'THE-GROUP'}}}}") + + groups = ["A","A","B","B"] + messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups] + index = 0 + for m in messages: + m.content['index'] = index + index += 1 + snd.send(m) + + s1 = self.setup_session() + c1 = s1.receiver("msg-group-q", options={"capacity":1}) + s2 = self.setup_session() + c2 = s2.receiver("msg-group-q", options={"capacity":1}) + + m1 = c1.fetch(0) + assert m1.properties['THE-GROUP'] == 'A' + assert m1.content['index'] == 0 + + m2 = c2.fetch(0) + assert m2.properties['THE-GROUP'] == 'B' + assert m2.content['index'] == 2 + + # C1 release m1, and the first group + + s1.acknowledge(m1, Disposition(RELEASED, set_redelivered=True)) + + # C2 should be able to get group 'A', msg 'A-0' now + m2 = c2.fetch(0) + assert m2.properties['THE-GROUP'] == 'A' + assert m2.content['index'] == 0 + + def test_reject(self): + """ Verify rejecting a message can free its associated group + """ + snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," + + " node: {x-declare: {arguments:" + + " {'qpid.group_header_key':'THE-GROUP'}}}}") + + groups = ["A","A","B","B"] + messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups] + index = 0 + for m in messages: + m.content['index'] = index + index += 1 + snd.send(m) + + s1 = self.setup_session() + c1 = s1.receiver("msg-group-q", options={"capacity":1}) + s2 = self.setup_session() + c2 = s2.receiver("msg-group-q", options={"capacity":1}) + + m1 = c1.fetch(0) + assert m1.properties['THE-GROUP'] == 'A' + assert m1.content['index'] == 0 + + m2 = c2.fetch(0) + assert m2.properties['THE-GROUP'] == 'B' + assert m2.content['index'] == 2 + + # C1 rejects m1, and the first group is released + s1.acknowledge(m1, Disposition(REJECTED)) + + # C2 should be able to get group 'A', msg 'A-1' now + m2 = c2.fetch(0) + assert m2.properties['THE-GROUP'] == 'A' + assert m2.content['index'] == 1 + + def test_close(self): + """ Verify behavior when a consumer that 'owns' a group closes. + """ + snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," + + " node: {x-declare: {arguments:" + + " {'qpid.group_header_key':'THE-GROUP'}}}}") + + groups = ["A","A","B","B"] + messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups] + index = 0 + for m in messages: + m.content['index'] = index + index += 1 + snd.send(m) + + s1 = self.setup_session() + c1 = s1.receiver("msg-group-q", options={"capacity":1}) + s2 = self.setup_session() + c2 = s2.receiver("msg-group-q", options={"capacity":1}) + + # C1 will own group A + m1 = c1.fetch(0) + assert m1.properties['THE-GROUP'] == 'A' + assert m1.content['index'] == 0 + + # C2 will own group B + m2 = c2.fetch(0) + assert m2.properties['THE-GROUP'] == 'B' + assert m2.content['index'] == 2 + + # C1 shuffles off the mortal coil... + c1.close(); + + # but the session (s1) remains active, so "A" remains blocked + # from c2, c2 should fetch the next B-3 + + m2 = c2.fetch(0) + assert m2.properties['THE-GROUP'] == 'B' + assert m2.content['index'] == 3 + + # and there should be no more messages available for C2 + try: + m2 = c2.fetch(0) + assert False # should never get here + except Empty: + pass + + # close session s1, releasing the A group + s1.close() + + m2 = c2.fetch(0) + assert m2.properties['THE-GROUP'] == 'A' + assert m2.content['index'] == 0 + + m2 = c2.fetch(0) + assert m2.properties['THE-GROUP'] == 'A' + assert m2.content['index'] == 1 + + # and there should be no more messages now + try: + m2 = c2.fetch(0) + assert False # should never get here + except Empty: + pass + + + + +class StickyConsumerMsgGroupTests(Base): + """ + Tests for the behavior of sticky-consumer message groups. These tests + expect all messages from the same group be consumed by the same clients. + See QPID-3347 for details. + """ + pass # TBD |