summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2011-10-03 18:39:52 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2011-10-03 18:39:52 +0000
commit476f673216adabd0eb9cdef6b760c923c6faad2a (patch)
tree9973a4832b7912cfe621fe5134d1713f52bfb424
parent50c30ee93cfdb958a69ac459b45792e68bc3978e (diff)
downloadqpid-python-476f673216adabd0eb9cdef6b760c923c6faad2a.tar.gz
QPID-3346: checkpoint some new python tests
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3346@1178511 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py372
1 files changed, 372 insertions, 0 deletions
diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py b/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py
new file mode 100644
index 0000000000..11b2caad08
--- /dev/null
+++ b/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py
@@ -0,0 +1,372 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from qpid.messaging import *
+from qpid.tests.messaging import Base
+
+from time import sleep
+#
+# Tests the Broker's support for message groups
+#
+
+class MultiConsumerMsgGroupTests(Base):
+ """
+ Tests for the behavior of multi-consumer message groups. These tests allow
+ a messages from the same group be consumed by multiple different clients as
+ long as each message is processed "in sequence". See QPID-3346 for
+ details.
+ """
+
+ def setup_connection(self):
+ return Connection.establish(self.broker, **self.connection_options())
+
+ def setup_session(self):
+ return self.conn.session()
+
+ def test_simple(self):
+ """ Verify simple acquire/accept actions on a set of grouped
+ messages shared between two receivers.
+ """
+ ## Create a msg group queue
+
+ snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
+ " node: {x-declare: {arguments:" +
+ " {'qpid.group_header_key':'THE-GROUP'}}}}")
+
+ groups = ["A","A","A","B","B","B","C","C","C"]
+ messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
+ index = 0
+ for m in messages:
+ m.content['index'] = index
+ index += 1
+ snd.send(m)
+
+ ## Queue = a-0, a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8...
+ ## Owners= ---, ---, ---, ---, ---, ---, ---, ---, ---,
+
+ # create consumers on separate sessions: C1,C2
+ s1 = self.setup_session()
+ c1 = s1.receiver("msg-group-q", options={"capacity":1})
+ s2 = self.setup_session()
+ c2 = s2.receiver("msg-group-q", options={"capacity":1})
+
+ # C1 should acquire A-0, then C2 should acquire B-3
+
+ m1 = c1.fetch(0);
+ assert m1.properties['THE-GROUP'] == 'A'
+ assert m1.content['index'] == 0
+
+ m2 = c2.fetch(0);
+ assert m2.properties['THE-GROUP'] == 'B'
+ assert m2.content['index'] == 3
+
+ # C1 Acknowledge A-0
+ c1.session.acknowledge(m1);
+
+ # C2 should next acquire A-1
+ m3 = c2.fetch(0);
+ assert m3.properties['THE-GROUP'] == 'A'
+ assert m3.content['index'] == 1
+
+ # C1 should next acquire C-6, since groups A&B are held by c2
+ m4 = c1.fetch(0);
+ assert m4.properties['THE-GROUP'] == 'C'
+ assert m4.content['index'] == 6
+
+ ## Queue = XXX, a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8...
+ ## Owners= ---, ^C2, +C2, ^C2, +C2, +C2, ^C1, +C1, +C1,
+
+ # C2 Acknowledge B-3, freeing up the rest of B group
+ c2.session.acknowledge(m2);
+
+ ## Queue = XXX, a-1, a-2, XXX, b-4, b-5, c-6, c-7, c-8...
+ ## Owners= ---, ^C2, +C2, ---, ---, ---, ^C1, +C1, +C1,
+
+ # C1 should now acquire B-4, since it is next "free"
+ m5 = c1.fetch(0);
+ assert m5.properties['THE-GROUP'] == 'B'
+ assert m5.content['index'] == 4
+
+ ## Queue = XXX, a-1, a-2, XXX, b-4, b-5, c-6, c-7, c-8...
+ ## Owners= ---, ^C2, +C2, ---, ^C1, +C1, ^C1, +C1, +C1,
+
+ # C1 acknowledges C-6, freeing the C group
+ c1.session.acknowledge(m4)
+
+ ## Queue = XXX, a-1, a-2, XXX, b-4, b-5, XXX, c-7, c-8...
+ ## Owners= ---, ^C2, +C2, ---, ^C1, +C1, ---, ---, ---
+
+ # C2 should next fetch A-2, followed by C-7
+ m7 = c2.fetch(0);
+ assert m7.properties['THE-GROUP'] == 'A'
+ assert m7.content['index'] == 2
+
+ m8 = c2.fetch(0);
+ assert m8.properties['THE-GROUP'] == 'C'
+ assert m8.content['index'] == 7
+
+ ## Queue = XXX, a-1, a-2, XXX, b-4, b-5, XXX, c-7, c-8...
+ ## Owners= ---, ^C2, ^C2, ---, ^C1, +C1, ---, ^C2, +C2
+
+ # have C2 ack all fetched messages, freeing C-8
+ c2.session.acknowledge()
+
+ ## Queue = XXX, XXX, XXX, XXX, b-4, b-5, XXX, XXX, c-8...
+ ## Owners= ---, ---, ---, ---, ^C1, +C1, ---, ---, ---
+
+ # the next fetch of C2 would get C-8, since B-5 is "owned"
+ m9 = c2.fetch(0);
+ assert m9.properties['THE-GROUP'] == 'C'
+ assert m9.content['index'] == 8
+
+ ## Queue = XXX, XXX, XXX, XXX, b-4, b-5, XXX, XXX, c-8...
+ ## Owners= ---, ---, ---, ---, ^C1, +C1, ---, ---, ^C2
+
+ # C1 acks B-4, freeing B-5 for consumption
+ c1.session.acknowledge(m5)
+
+ ## Queue = XXX, XXX, XXX, XXX, XXX, b-5, XXX, XXX, c-8...
+ ## Owners= ---, ---, ---, ---, ---, ^C2, ---, ---, ^C2
+
+ # the next fetch of C2 would get B-5
+ m10 = c2.fetch(0);
+ assert m10.properties['THE-GROUP'] == 'B'
+ assert m10.content['index'] == 5
+
+ # there should be no more left for C1:
+ try:
+ mx = c1.fetch(0)
+ assert False # should never get here
+ except Empty:
+ pass
+
+ c1.session.acknowledge()
+ c2.session.acknowledge()
+ c1.close()
+ c2.close()
+ snd.close()
+
+ def test_simple_browse(self):
+ """ Test the behavior of a browsing subscription on a message grouping
+ queue.
+ """
+
+ ## Create a msg group queue
+
+ snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
+ " node: {x-declare: {arguments:" +
+ " {'qpid.group_header_key':'THE-GROUP'}}}}")
+
+ groups = ["A","B","A","B","C"]
+ messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
+ index = 0
+ for m in messages:
+ m.content['index'] = index
+ index += 1
+ snd.send(m)
+
+ ## Queue = A-0, B-1, A-2, b-3, C-4
+ ## Owners= ---, ---, ---, ---, ---
+
+ # create consumer and browser
+ s1 = self.setup_session()
+ c1 = s1.receiver("msg-group-q", options={"capacity":1})
+ s2 = self.setup_session()
+ b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":1})
+
+ # C1 should acquire A-0
+
+ m1 = c1.fetch(0);
+ assert m1.properties['THE-GROUP'] == 'A'
+ assert m1.content['index'] == 0
+
+ ## Queue = A-0, B-1, A-2, b-3, C-4
+ ## Owners= ^C1, ---, +C1, ---, ---
+
+ m2 = b1.fetch(0)
+ assert m2.properties['THE-GROUP'] == 'B'
+ assert m2.content['index'] == 1
+
+ # verify that the browser may see A-2, even though its group is owned
+ # by C1
+ m2 = b1.fetch(0)
+ assert m2.properties['THE-GROUP'] == 'A'
+ assert m2.content['index'] == 2
+
+ m2 = b1.fetch(0)
+ assert m2.properties['THE-GROUP'] == 'B'
+ assert m2.content['index'] == 3
+
+ # verify the consumer can own groups currently seen by the browser
+ m3 = c1.fetch(0);
+ assert m3.properties['THE-GROUP'] == 'B'
+ assert m3.content['index'] == 1
+
+ m2 = b1.fetch(0)
+ assert m2.properties['THE-GROUP'] == 'C'
+ assert m2.content['index'] == 4
+
+ def test_release(self):
+ """ Verify releasing a message can free its assocated group
+ """
+ snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
+ " node: {x-declare: {arguments:" +
+ " {'qpid.group_header_key':'THE-GROUP'}}}}")
+
+ groups = ["A","A","B","B"]
+ messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
+ index = 0
+ for m in messages:
+ m.content['index'] = index
+ index += 1
+ snd.send(m)
+
+ s1 = self.setup_session()
+ c1 = s1.receiver("msg-group-q", options={"capacity":1})
+ s2 = self.setup_session()
+ c2 = s2.receiver("msg-group-q", options={"capacity":1})
+
+ m1 = c1.fetch(0)
+ assert m1.properties['THE-GROUP'] == 'A'
+ assert m1.content['index'] == 0
+
+ m2 = c2.fetch(0)
+ assert m2.properties['THE-GROUP'] == 'B'
+ assert m2.content['index'] == 2
+
+ # C1 release m1, and the first group
+
+ s1.acknowledge(m1, Disposition(RELEASED, set_redelivered=True))
+
+ # C2 should be able to get group 'A', msg 'A-0' now
+ m2 = c2.fetch(0)
+ assert m2.properties['THE-GROUP'] == 'A'
+ assert m2.content['index'] == 0
+
+ def test_reject(self):
+ """ Verify rejecting a message can free its associated group
+ """
+ snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
+ " node: {x-declare: {arguments:" +
+ " {'qpid.group_header_key':'THE-GROUP'}}}}")
+
+ groups = ["A","A","B","B"]
+ messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
+ index = 0
+ for m in messages:
+ m.content['index'] = index
+ index += 1
+ snd.send(m)
+
+ s1 = self.setup_session()
+ c1 = s1.receiver("msg-group-q", options={"capacity":1})
+ s2 = self.setup_session()
+ c2 = s2.receiver("msg-group-q", options={"capacity":1})
+
+ m1 = c1.fetch(0)
+ assert m1.properties['THE-GROUP'] == 'A'
+ assert m1.content['index'] == 0
+
+ m2 = c2.fetch(0)
+ assert m2.properties['THE-GROUP'] == 'B'
+ assert m2.content['index'] == 2
+
+ # C1 rejects m1, and the first group is released
+ s1.acknowledge(m1, Disposition(REJECTED))
+
+ # C2 should be able to get group 'A', msg 'A-1' now
+ m2 = c2.fetch(0)
+ assert m2.properties['THE-GROUP'] == 'A'
+ assert m2.content['index'] == 1
+
+ def test_close(self):
+ """ Verify behavior when a consumer that 'owns' a group closes.
+ """
+ snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
+ " node: {x-declare: {arguments:" +
+ " {'qpid.group_header_key':'THE-GROUP'}}}}")
+
+ groups = ["A","A","B","B"]
+ messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
+ index = 0
+ for m in messages:
+ m.content['index'] = index
+ index += 1
+ snd.send(m)
+
+ s1 = self.setup_session()
+ c1 = s1.receiver("msg-group-q", options={"capacity":1})
+ s2 = self.setup_session()
+ c2 = s2.receiver("msg-group-q", options={"capacity":1})
+
+ # C1 will own group A
+ m1 = c1.fetch(0)
+ assert m1.properties['THE-GROUP'] == 'A'
+ assert m1.content['index'] == 0
+
+ # C2 will own group B
+ m2 = c2.fetch(0)
+ assert m2.properties['THE-GROUP'] == 'B'
+ assert m2.content['index'] == 2
+
+ # C1 shuffles off the mortal coil...
+ c1.close();
+
+ # but the session (s1) remains active, so "A" remains blocked
+ # from c2, c2 should fetch the next B-3
+
+ m2 = c2.fetch(0)
+ assert m2.properties['THE-GROUP'] == 'B'
+ assert m2.content['index'] == 3
+
+ # and there should be no more messages available for C2
+ try:
+ m2 = c2.fetch(0)
+ assert False # should never get here
+ except Empty:
+ pass
+
+ # close session s1, releasing the A group
+ s1.close()
+
+ m2 = c2.fetch(0)
+ assert m2.properties['THE-GROUP'] == 'A'
+ assert m2.content['index'] == 0
+
+ m2 = c2.fetch(0)
+ assert m2.properties['THE-GROUP'] == 'A'
+ assert m2.content['index'] == 1
+
+ # and there should be no more messages now
+ try:
+ m2 = c2.fetch(0)
+ assert False # should never get here
+ except Empty:
+ pass
+
+
+
+
+class StickyConsumerMsgGroupTests(Base):
+ """
+ Tests for the behavior of sticky-consumer message groups. These tests
+ expect all messages from the same group be consumed by the same clients.
+ See QPID-3347 for details.
+ """
+ pass # TBD