summaryrefslogtreecommitdiff
path: root/tests/src/py/qpid_tests/broker_0_10/msg_groups.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/src/py/qpid_tests/broker_0_10/msg_groups.py')
-rw-r--r--tests/src/py/qpid_tests/broker_0_10/msg_groups.py1077
1 files changed, 0 insertions, 1077 deletions
diff --git a/tests/src/py/qpid_tests/broker_0_10/msg_groups.py b/tests/src/py/qpid_tests/broker_0_10/msg_groups.py
deleted file mode 100644
index 99d11151e8..0000000000
--- a/tests/src/py/qpid_tests/broker_0_10/msg_groups.py
+++ /dev/null
@@ -1,1077 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-from qpid.messaging import *
-from qpid.tests.messaging import Base
-import qmf.console
-
-from time import sleep
-#
-# Tests the Broker's support for message groups
-#
-
-class MultiConsumerMsgGroupTests(Base):
- """
- Tests for the behavior of multi-consumer message groups. These tests allow
- a messages from the same group be consumed by multiple different clients as
- long as each message is processed "in sequence". See QPID-3346 for
- details.
- """
-
- def setup_connection(self):
- return Connection.establish(self.broker, **self.connection_options())
-
- def setup_session(self):
- return self.conn.session()
-
- def test_simple(self):
- """ Verify simple acquire/accept actions on a set of grouped
- messages shared between two receivers.
- """
- ## Create a msg group queue
-
- snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
- " node: {x-declare: {arguments:" +
- " {'qpid.group_header_key':'THE-GROUP'," +
- "'qpid.shared_msg_group':1}}}}")
-
- groups = ["A","A","A","B","B","B","C","C","C"]
- messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
- index = 0
- for m in messages:
- m.content['index'] = index
- index += 1
- snd.send(m)
-
- ## Queue = a-0, a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8...
- ## Owners= ---, ---, ---, ---, ---, ---, ---, ---, ---,
-
- # create consumers on separate sessions: C1,C2
- s1 = self.setup_session()
- c1 = s1.receiver("msg-group-q", options={"capacity":0})
- s2 = self.setup_session()
- c2 = s2.receiver("msg-group-q", options={"capacity":0})
-
- # C1 should acquire A-0, then C2 should acquire B-3
-
- m1 = c1.fetch(0);
- assert m1.properties['THE-GROUP'] == 'A'
- assert m1.content['index'] == 0
-
- m2 = c2.fetch(0);
- assert m2.properties['THE-GROUP'] == 'B'
- assert m2.content['index'] == 3
-
- # C1 Acknowledge A-0
- c1.session.acknowledge(m1);
-
- # C2 should next acquire A-1
- m3 = c2.fetch(0);
- assert m3.properties['THE-GROUP'] == 'A'
- assert m3.content['index'] == 1
-
- # C1 should next acquire C-6, since groups A&B are held by c2
- m4 = c1.fetch(0);
- assert m4.properties['THE-GROUP'] == 'C'
- assert m4.content['index'] == 6
-
- ## Queue = XXX, a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8...
- ## Owners= ---, ^C2, +C2, ^C2, +C2, +C2, ^C1, +C1, +C1,
-
- # C2 Acknowledge B-3, freeing up the rest of B group
- c2.session.acknowledge(m2);
-
- ## Queue = XXX, a-1, a-2, XXX, b-4, b-5, c-6, c-7, c-8...
- ## Owners= ---, ^C2, +C2, ---, ---, ---, ^C1, +C1, +C1,
-
- # C1 should now acquire B-4, since it is next "free"
- m5 = c1.fetch(0);
- assert m5.properties['THE-GROUP'] == 'B'
- assert m5.content['index'] == 4
-
- ## Queue = XXX, a-1, a-2, XXX, b-4, b-5, c-6, c-7, c-8...
- ## Owners= ---, ^C2, +C2, ---, ^C1, +C1, ^C1, +C1, +C1,
-
- # C1 acknowledges C-6, freeing the C group
- c1.session.acknowledge(m4)
-
- ## Queue = XXX, a-1, a-2, XXX, b-4, b-5, XXX, c-7, c-8...
- ## Owners= ---, ^C2, +C2, ---, ^C1, +C1, ---, ---, ---
-
- # C2 should next fetch A-2, followed by C-7
- m7 = c2.fetch(0);
- assert m7.properties['THE-GROUP'] == 'A'
- assert m7.content['index'] == 2
-
- m8 = c2.fetch(0);
- assert m8.properties['THE-GROUP'] == 'C'
- assert m8.content['index'] == 7
-
- ## Queue = XXX, a-1, a-2, XXX, b-4, b-5, XXX, c-7, c-8...
- ## Owners= ---, ^C2, ^C2, ---, ^C1, +C1, ---, ^C2, +C2
-
- # have C2 ack all fetched messages, freeing C-8
- c2.session.acknowledge()
-
- ## Queue = XXX, XXX, XXX, XXX, b-4, b-5, XXX, XXX, c-8...
- ## Owners= ---, ---, ---, ---, ^C1, +C1, ---, ---, ---
-
- # the next fetch of C2 would get C-8, since B-5 is "owned"
- m9 = c2.fetch(0);
- assert m9.properties['THE-GROUP'] == 'C'
- assert m9.content['index'] == 8
-
- ## Queue = XXX, XXX, XXX, XXX, b-4, b-5, XXX, XXX, c-8...
- ## Owners= ---, ---, ---, ---, ^C1, +C1, ---, ---, ^C2
-
- # C1 acks B-4, freeing B-5 for consumption
- c1.session.acknowledge(m5)
-
- ## Queue = XXX, XXX, XXX, XXX, XXX, b-5, XXX, XXX, c-8...
- ## Owners= ---, ---, ---, ---, ---, ^C2, ---, ---, ^C2
-
- # the next fetch of C2 would get B-5
- m10 = c2.fetch(0);
- assert m10.properties['THE-GROUP'] == 'B'
- assert m10.content['index'] == 5
-
- # there should be no more left for C1:
- try:
- mx = c1.fetch(0)
- assert False # should never get here
- except Empty:
- pass
-
- c1.session.acknowledge()
- c2.session.acknowledge()
- c1.close()
- c2.close()
- snd.close()
-
- def test_simple_browse(self):
- """ Test the behavior of a browsing subscription on a message grouping
- queue.
- """
-
- ## Create a msg group queue
-
- snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
- " node: {x-declare: {arguments:" +
- " {'qpid.group_header_key':'THE-GROUP'," +
- "'qpid.shared_msg_group':1}}}}")
-
- groups = ["A","B","A","B","C"]
- messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
- index = 0
- for m in messages:
- m.content['index'] = index
- index += 1
- snd.send(m)
-
- ## Queue = A-0, B-1, A-2, b-3, C-4
- ## Owners= ---, ---, ---, ---, ---
-
- # create consumer and browser
- s1 = self.setup_session()
- c1 = s1.receiver("msg-group-q", options={"capacity":0})
- s2 = self.setup_session()
- b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":0})
-
- # C1 should acquire A-0
-
- m1 = c1.fetch(0);
- assert m1.properties['THE-GROUP'] == 'A'
- assert m1.content['index'] == 0
-
- ## Queue = A-0, B-1, A-2, b-3, C-4
- ## Owners= ^C1, ---, +C1, ---, ---
-
- m2 = b1.fetch(0)
- assert m2.properties['THE-GROUP'] == 'B'
- assert m2.content['index'] == 1
-
- # verify that the browser may see A-2, even though its group is owned
- # by C1
- m2 = b1.fetch(0)
- assert m2.properties['THE-GROUP'] == 'A'
- assert m2.content['index'] == 2
-
- m2 = b1.fetch(0)
- assert m2.properties['THE-GROUP'] == 'B'
- assert m2.content['index'] == 3
-
- # verify the consumer can own groups currently seen by the browser
- m3 = c1.fetch(0);
- assert m3.properties['THE-GROUP'] == 'B'
- assert m3.content['index'] == 1
-
- m2 = b1.fetch(0)
- assert m2.properties['THE-GROUP'] == 'C'
- assert m2.content['index'] == 4
-
- def test_release(self):
- """ Verify releasing a message can free its assocated group
- """
- snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
- " node: {x-declare: {arguments:" +
- " {'qpid.group_header_key':'THE-GROUP'," +
- "'qpid.shared_msg_group':1}}}}")
-
- groups = ["A","A","B","B"]
- messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
- index = 0
- for m in messages:
- m.content['index'] = index
- index += 1
- snd.send(m)
-
- s1 = self.setup_session()
- c1 = s1.receiver("msg-group-q", options={"capacity":0})
- s2 = self.setup_session()
- c2 = s2.receiver("msg-group-q", options={"capacity":0})
-
- m1 = c1.fetch(0)
- assert m1.properties['THE-GROUP'] == 'A'
- assert m1.content['index'] == 0
-
- m2 = c2.fetch(0)
- assert m2.properties['THE-GROUP'] == 'B'
- assert m2.content['index'] == 2
-
- # C1 release m1, and the first group
-
- s1.acknowledge(m1, Disposition(RELEASED, set_redelivered=True))
-
- # C2 should be able to get group 'A', msg 'A-0' now
- m2 = c2.fetch(0)
- assert m2.properties['THE-GROUP'] == 'A'
- assert m2.content['index'] == 0
-
- def test_reject(self):
- """ Verify rejecting a message can free its associated group
- """
- snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
- " node: {x-declare: {arguments:" +
- " {'qpid.group_header_key':'THE-GROUP'," +
- "'qpid.shared_msg_group':1}}}}")
-
- groups = ["A","A","B","B"]
- messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
- index = 0
- for m in messages:
- m.content['index'] = index
- index += 1
- snd.send(m)
-
- s1 = self.setup_session()
- c1 = s1.receiver("msg-group-q", options={"capacity":0})
- s2 = self.setup_session()
- c2 = s2.receiver("msg-group-q", options={"capacity":0})
-
- m1 = c1.fetch(0)
- assert m1.properties['THE-GROUP'] == 'A'
- assert m1.content['index'] == 0
-
- m2 = c2.fetch(0)
- assert m2.properties['THE-GROUP'] == 'B'
- assert m2.content['index'] == 2
-
- # C1 rejects m1, and the first group is released
- s1.acknowledge(m1, Disposition(REJECTED))
-
- # C2 should be able to get group 'A', msg 'A-1' now
- m2 = c2.fetch(0)
- assert m2.properties['THE-GROUP'] == 'A'
- assert m2.content['index'] == 1
-
- def test_close(self):
- """ Verify behavior when a consumer that 'owns' a group closes.
- """
- snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
- " node: {x-declare: {arguments:" +
- " {'qpid.group_header_key':'THE-GROUP'," +
- "'qpid.shared_msg_group':1}}}}")
-
- groups = ["A","A","B","B"]
- messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
- index = 0
- for m in messages:
- m.content['index'] = index
- index += 1
- snd.send(m)
-
- s1 = self.setup_session()
- c1 = s1.receiver("msg-group-q", options={"capacity":0})
- s2 = self.setup_session()
- c2 = s2.receiver("msg-group-q", options={"capacity":0})
-
- # C1 will own group A
- m1 = c1.fetch(0)
- assert m1.properties['THE-GROUP'] == 'A'
- assert m1.content['index'] == 0
-
- # C2 will own group B
- m2 = c2.fetch(0)
- assert m2.properties['THE-GROUP'] == 'B'
- assert m2.content['index'] == 2
-
- # C1 shuffles off the mortal coil...
- c1.close();
-
- # but the session (s1) remains active, so "A" remains blocked
- # from c2, c2 should fetch the next B-3
-
- m2 = c2.fetch(0)
- assert m2.properties['THE-GROUP'] == 'B'
- assert m2.content['index'] == 3
-
- # and there should be no more messages available for C2
- try:
- m2 = c2.fetch(0)
- assert False # should never get here
- except Empty:
- pass
-
- # close session s1, releasing the A group
- s1.close()
-
- m2 = c2.fetch(0)
- assert m2.properties['THE-GROUP'] == 'A'
- assert m2.content['index'] == 0
-
- m2 = c2.fetch(0)
- assert m2.properties['THE-GROUP'] == 'A'
- assert m2.content['index'] == 1
-
- # and there should be no more messages now
- try:
- m2 = c2.fetch(0)
- assert False # should never get here
- except Empty:
- pass
-
- def test_transaction(self):
- """ Verify behavior when using transactions.
- """
- snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
- " node: {x-declare: {arguments:" +
- " {'qpid.group_header_key':'THE-GROUP'," +
- "'qpid.shared_msg_group':1}}}}")
-
- groups = ["A","A","B","B","A","B"]
- messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
- index = 0
- for m in messages:
- m.content['index'] = index
- index += 1
- snd.send(m)
-
- s1 = self.conn.session(transactional=True)
- c1 = s1.receiver("msg-group-q", options={"capacity":0})
- s2 = self.conn.session(transactional=True)
- c2 = s2.receiver("msg-group-q", options={"capacity":0})
-
- # C1 gets group A
- m1 = c1.fetch(0)
- assert m1.properties['THE-GROUP'] == 'A'
- assert m1.content['index'] == 0
-
- # C2 gets group B
- m2 = c2.fetch(0)
- assert m2.properties['THE-GROUP'] == 'B'
- assert m2.content['index'] == 2
-
- s1.acknowledge(m1) # A-0 consumed, A group freed
- s2.acknowledge(m2) # B-2 consumed, B group freed
-
- s1.commit() # A-0 consumption done, A group now free
- s2.rollback() # releases B-2, and group B
-
- ## Q: ["A1","B2","B3","A4","B5"]
-
- # C2 should be able to get the next A
- m3 = c2.fetch(0)
- assert m3.properties['THE-GROUP'] == 'A'
- assert m3.content['index'] == 1
-
- # C1 should be able to get B-2
- m4 = c1.fetch(0)
- assert m4.properties['THE-GROUP'] == 'B'
- assert m4.content['index'] == 2
-
- s2.acknowledge(m3) # C2 consumes A-1
- s1.acknowledge(m4) # C1 consumes B-2
- s1.commit() # C1 consume B-2 occurs, free group B
-
- ## Q: [["A1",]"B3","A4","B5"]
-
- # A-1 is still considered owned by C2, since the commit has yet to
- # occur, so the next available to C1 would be B-3
- m5 = c1.fetch(0) # B-3
- assert m5.properties['THE-GROUP'] == 'B'
- assert m5.content['index'] == 3
-
- # and C2 should find A-4 available, since it owns the A group
- m6 = c2.fetch(0) # A-4
- assert m6.properties['THE-GROUP'] == 'A'
- assert m6.content['index'] == 4
-
- s2.acknowledge(m6) # C2 consumes A-4
-
- # uh-oh, A-1 and A-4 released, along with A group
- s2.rollback()
-
- ## Q: ["A1",["B3"],"A4","B5"]
- m7 = c1.fetch(0) # A-1 is found
- assert m7.properties['THE-GROUP'] == 'A'
- assert m7.content['index'] == 1
-
- ## Q: [["A1"],["B3"],"A4","B5"]
- # since C1 "owns" both A and B group, C2 should find nothing available
- try:
- m8 = c2.fetch(0)
- assert False # should not get here
- except Empty:
- pass
-
- # C1 next gets A4
- m9 = c1.fetch(0)
- assert m9.properties['THE-GROUP'] == 'A'
- assert m9.content['index'] == 4
-
- s1.acknowledge()
-
- ## Q: [["A1"],["B3"],["A4"],"B5"]
- # even though C1 acknowledges A1,B3, and A4, B5 is still considered
- # owned as the commit has yet to take place
- try:
- m10 = c2.fetch(0)
- assert False # should not get here
- except Empty:
- pass
-
- # now A1,B3,A4 dequeued, B5 should be free
- s1.commit()
-
- ## Q: ["B5"]
- m11 = c2.fetch(0)
- assert m11.properties['THE-GROUP'] == 'B'
- assert m11.content['index'] == 5
-
- s2.acknowledge()
- s2.commit()
-
- def test_send_transaction(self):
- """ Verify behavior when sender is using transactions.
- """
- ssn = self.conn.session(transactional=True)
- snd = ssn.sender("msg-group-q; {create:always, delete:sender," +
- " node: {x-declare: {arguments:" +
- " {'qpid.group_header_key':'THE-GROUP'," +
- "'qpid.shared_msg_group':1}}}}")
-
- msg = Message(content={'index':0}, properties={"THE-GROUP": "A"})
- snd.send(msg)
- msg = Message(content={'index':1}, properties={"THE-GROUP": "B"})
- snd.send(msg)
- snd.session.commit()
- msg = Message(content={'index':2}, properties={"THE-GROUP": "A"})
- snd.send(msg)
-
- # Queue: [A0,B1, (uncommitted: A2) ]
-
- s1 = self.conn.session(transactional=True)
- c1 = s1.receiver("msg-group-q", options={"capacity":0})
- s2 = self.conn.session(transactional=True)
- c2 = s2.receiver("msg-group-q", options={"capacity":0})
-
- # C1 gets A0, group A
- m1 = c1.fetch(0)
- assert m1.properties['THE-GROUP'] == 'A'
- assert m1.content['index'] == 0
-
- # C2 gets B2, group B
- m2 = c2.fetch(0)
- assert m2.properties['THE-GROUP'] == 'B'
- assert m2.content['index'] == 1
-
- # Since A2 uncommitted, there should be nothing left to fetch
- try:
- mX = c1.fetch(0)
- assert False # should not get here
- except Empty:
- pass
- try:
- mX = c2.fetch(0)
- assert False # should not get here
- except Empty:
- pass
-
- snd.session.commit()
- msg = Message(content={'index':3}, properties={"THE-GROUP": "B"})
- snd.send(msg)
-
- # Queue: [A2, (uncommitted: B3) ]
-
- # B3 has yet to be committed, so C2 should see nothing available:
- try:
- mX = c2.fetch(0)
- assert False # should not get here
- except Empty:
- pass
-
- # but A2 should be available to C1
- m3 = c1.fetch(0)
- assert m3.properties['THE-GROUP'] == 'A'
- assert m3.content['index'] == 2
-
- # now make B3 available
- snd.session.commit()
-
- # C1 should still be done:
- try:
- mX = c1.fetch(0)
- assert False # should not get here
- except Empty:
- pass
-
- # but C2 should find the new B
- m4 = c2.fetch(0)
- assert m4.properties['THE-GROUP'] == 'B'
- assert m4.content['index'] == 3
-
- # extra: have C1 rollback, verify C2 finds the released 'A' messages
- c1.session.rollback()
-
- ## Q: ["A0","A2"]
-
- # C2 should be able to get the next A
- m5 = c2.fetch(0)
- assert m5.properties['THE-GROUP'] == 'A'
- assert m5.content['index'] == 0
-
- m6 = c2.fetch(0)
- assert m6.properties['THE-GROUP'] == 'A'
- assert m6.content['index'] == 2
-
- c2.session.acknowledge()
- c2.session.commit()
-
- def test_query(self):
- """ Verify the queue query method against message groups
- """
- snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
- " node: {x-declare: {arguments:" +
- " {'qpid.group_header_key':'THE-GROUP'," +
- "'qpid.shared_msg_group':1}}}}")
-
- groups = ["A","B","C","A","B","C","A"]
- messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
- index = 0
- for m in messages:
- m.content['index'] = index
- index += 1
- snd.send(m)
-
- s1 = self.setup_session()
- c1 = s1.receiver("msg-group-q", options={"capacity":0})
- s2 = self.setup_session()
- c2 = s2.receiver("msg-group-q", options={"capacity":0})
-
- m1 = c1.fetch(0)
- m2 = c2.fetch(0)
-
- # at this point, group A should be owned by C1, group B by C2, and
- # group C should be available
-
- # now setup a QMF session, so we can call methods
- self.qmf_session = qmf.console.Session()
- self.qmf_broker = self.qmf_session.addBroker(str(self.broker))
- brokers = self.qmf_session.getObjects(_class="broker")
- assert len(brokers) == 1
- broker = brokers[0]
-
- # verify the query method call's group information
- rc = broker.query("queue", "msg-group-q")
- assert rc.status == 0
- assert rc.text == "OK"
- results = rc.outArgs['results']
- assert 'qpid.message_group_queue' in results
- q_info = results['qpid.message_group_queue']
- assert 'group_header_key' in q_info and q_info['group_header_key'] == "THE-GROUP"
- assert 'group_state' in q_info and len(q_info['group_state']) == 3
- for g_info in q_info['group_state']:
- assert 'group_id' in g_info
- if g_info['group_id'] == "A":
- assert g_info['msg_count'] == 3
- assert g_info['consumer'] != ""
- elif g_info['group_id'] == "B":
- assert g_info['msg_count'] == 2
- assert g_info['consumer'] != ""
- elif g_info['group_id'] == "C":
- assert g_info['msg_count'] == 2
- assert g_info['consumer'] == ""
- else:
- assert(False) # should never get here
- self.qmf_session.delBroker(self.qmf_broker)
-
- def test_purge_free(self):
- """ Verify we can purge a queue of all messages of a given "unowned"
- group.
- """
- snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
- " node: {x-declare: {arguments:" +
- " {'qpid.group_header_key':'THE-GROUP'," +
- "'qpid.shared_msg_group':1}}}}")
-
- groups = ["A","B","A","B","C","A"]
- messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
- index = 0
- for m in messages:
- m.content['index'] = index
- index += 1
- snd.send(m)
-
- # now setup a QMF session, so we can call methods
- self.qmf_session = qmf.console.Session()
- self.qmf_broker = self.qmf_session.addBroker(str(self.broker))
- queue = self.qmf_session.getObjects(_class="queue", name="msg-group-q")[0]
- assert queue
- msg_filter = { 'filter_type' : 'header_match_str',
- 'filter_params' : { 'header_key' : "THE-GROUP",
- 'header_value' : "B" }}
- assert queue.msgDepth == 6
- rc = queue.purge(0, msg_filter)
- assert rc.status == 0
- queue.update()
- assert queue.msgDepth == 4
-
- # verify all B's removed....
- s2 = self.setup_session()
- b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":0})
- count = 0
- try:
- while True:
- m2 = b1.fetch(0)
- assert m2.properties['THE-GROUP'] != 'B'
- count += 1
- except Empty:
- pass
- assert count == 4
-
- self.qmf_session.delBroker(self.qmf_broker)
-
- def test_purge_acquired(self):
- """ Verify we can purge messages from an acquired group.
- """
- snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
- " node: {x-declare: {arguments:" +
- " {'qpid.group_header_key':'THE-GROUP'," +
- "'qpid.shared_msg_group':1}}}}")
-
- groups = ["A","B","A","B","C","A"]
- messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
- index = 0
- for m in messages:
- m.content['index'] = index
- index += 1
- snd.send(m)
-
- # acquire group "A"
- s1 = self.setup_session()
- c1 = s1.receiver("msg-group-q", options={"capacity":0})
- m1 = c1.fetch(0)
- assert m1.properties['THE-GROUP'] == 'A'
- assert m1.content['index'] == 0
-
- # now setup a QMF session, so we can purge group A
- self.qmf_session = qmf.console.Session()
- self.qmf_broker = self.qmf_session.addBroker(str(self.broker))
- queue = self.qmf_session.getObjects(_class="queue", name="msg-group-q")[0]
- assert queue
- msg_filter = { 'filter_type' : 'header_match_str',
- 'filter_params' : { 'header_key' : "THE-GROUP",
- 'header_value' : "A" }}
- assert queue.msgDepth == 6
- rc = queue.purge(0, msg_filter)
- assert rc.status == 0
- queue.update()
- queue.msgDepth == 4 # the pending acquired A still counts!
-
- # verify all other A's removed....
- s2 = self.setup_session()
- b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":0})
- count = 0
- try:
- while True:
- m2 = b1.fetch(0)
- assert m2.properties['THE-GROUP'] != 'A'
- count += 1
- except Empty:
- pass
- assert count == 3 # only 3 really available
- s1.acknowledge() # ack the consumed A-0
- self.qmf_session.delBroker(self.qmf_broker)
-
- def test_purge_count(self):
- """ Verify we can purge a fixed number of messages from an acquired
- group.
- """
- snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
- " node: {x-declare: {arguments:" +
- " {'qpid.group_header_key':'THE-GROUP'," +
- "'qpid.shared_msg_group':1}}}}")
-
- groups = ["A","B","A","B","C","A"]
- messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
- index = 0
- for m in messages:
- m.content['index'] = index
- index += 1
- snd.send(m)
-
- # acquire group "A"
- s1 = self.setup_session()
- c1 = s1.receiver("msg-group-q", options={"capacity":0})
- m1 = c1.fetch(0)
- assert m1.properties['THE-GROUP'] == 'A'
- assert m1.content['index'] == 0
-
- # now setup a QMF session, so we can purge group A
- self.qmf_session = qmf.console.Session()
- self.qmf_broker = self.qmf_session.addBroker(str(self.broker))
- queue = self.qmf_session.getObjects(_class="queue", name="msg-group-q")[0]
- assert queue
- msg_filter = { 'filter_type' : 'header_match_str',
- 'filter_params' : { 'header_key' : "THE-GROUP",
- 'header_value' : "A" }}
- assert queue.msgDepth == 6
- rc = queue.purge(1, msg_filter)
- assert rc.status == 0
- queue.update()
- queue.msgDepth == 5 # the pending acquired A still counts!
-
- # verify all other A's removed....
- s2 = self.setup_session()
- b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":0})
- count = 0
- a_count = 0
- try:
- while True:
- m2 = b1.fetch(0)
- if m2.properties['THE-GROUP'] != 'A':
- count += 1
- else:
- a_count += 1
- except Empty:
- pass
- assert count == 3 # non-A's
- assert a_count == 1 # and one is an A
- s1.acknowledge() # ack the consumed A-0
- self.qmf_session.delBroker(self.qmf_broker)
-
- def test_move_all(self):
- """ Verify we can move messages from an acquired group.
- """
- snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
- " node: {x-declare: {arguments:" +
- " {'qpid.group_header_key':'THE-GROUP'," +
- "'qpid.shared_msg_group':1}}}}")
-
- groups = ["A","B","A","B","C","A"]
- messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
- index = 0
- for m in messages:
- m.content['index'] = index
- index += 1
- snd.send(m)
-
- # set up destination queue
- rcvr = self.ssn.receiver("dest-q; {create:always, delete:receiver," +
- " node: {x-declare: {arguments:" +
- " {'qpid.group_header_key':'THE-GROUP'," +
- "'qpid.shared_msg_group':1}}}}")
-
- # acquire group "A"
- s1 = self.setup_session()
- c1 = s1.receiver("msg-group-q", options={"capacity":0})
- m1 = c1.fetch(0)
- assert m1.properties['THE-GROUP'] == 'A'
- assert m1.content['index'] == 0
-
- # now setup a QMF session, so we can move what's left of group A
- self.qmf_session = qmf.console.Session()
- self.qmf_broker = self.qmf_session.addBroker(str(self.broker))
- brokers = self.qmf_session.getObjects(_class="broker")
- assert len(brokers) == 1
- broker = brokers[0]
- msg_filter = { 'filter_type' : 'header_match_str',
- 'filter_params' : { 'header_key' : "THE-GROUP",
- 'header_value' : "A" }}
- rc = broker.queueMoveMessages("msg-group-q", "dest-q", 0, msg_filter)
- assert rc.status == 0
-
- # verify all other A's removed from msg-group-q
- s2 = self.setup_session()
- b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":0})
- count = 0
- try:
- while True:
- m2 = b1.fetch(0)
- assert m2.properties['THE-GROUP'] != 'A'
- count += 1
- except Empty:
- pass
- assert count == 3 # only 3 really available
-
- # verify the moved A's are at the dest-q
- s2 = self.setup_session()
- b1 = s2.receiver("dest-q; {mode: browse}", options={"capacity":0})
- count = 0
- try:
- while True:
- m2 = b1.fetch(0)
- assert m2.properties['THE-GROUP'] == 'A'
- assert m2.content['index'] == 2 or m2.content['index'] == 5
- count += 1
- except Empty:
- pass
- assert count == 2 # two A's moved
-
- s1.acknowledge() # ack the consumed A-0
- self.qmf_session.delBroker(self.qmf_broker)
-
- def test_move_count(self):
- """ Verify we can move a fixed number of messages from an acquired group.
- """
- snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
- " node: {x-declare: {arguments:" +
- " {'qpid.group_header_key':'THE-GROUP'," +
- "'qpid.shared_msg_group':1}}}}")
-
- groups = ["A","B","A","B","C","A"]
- messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
- index = 0
- for m in messages:
- m.content['index'] = index
- index += 1
- snd.send(m)
-
- # set up destination queue
- rcvr = self.ssn.receiver("dest-q; {create:always, delete:receiver," +
- " node: {x-declare: {arguments:" +
- " {'qpid.group_header_key':'THE-GROUP'," +
- "'qpid.shared_msg_group':1}}}}")
-
- # now setup a QMF session, so we can move group B
- self.qmf_session = qmf.console.Session()
- self.qmf_broker = self.qmf_session.addBroker(str(self.broker))
- brokers = self.qmf_session.getObjects(_class="broker")
- assert len(brokers) == 1
- broker = brokers[0]
- msg_filter = { 'filter_type' : 'header_match_str',
- 'filter_params' : { 'header_key' : "THE-GROUP",
- 'header_value' : "B" }}
- rc = broker.queueMoveMessages("msg-group-q", "dest-q", 3, msg_filter)
- assert rc.status == 0
-
- # verify all B's removed from msg-group-q
- s2 = self.setup_session()
- b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":0})
- count = 0
- try:
- while True:
- m2 = b1.fetch(0)
- assert m2.properties['THE-GROUP'] != 'B'
- count += 1
- except Empty:
- pass
- assert count == 4
-
- # verify the moved B's are at the dest-q
- s2 = self.setup_session()
- b1 = s2.receiver("dest-q; {mode: browse}", options={"capacity":0})
- count = 0
- try:
- while True:
- m2 = b1.fetch(0)
- assert m2.properties['THE-GROUP'] == 'B'
- assert m2.content['index'] == 1 or m2.content['index'] == 3
- count += 1
- except Empty:
- pass
- assert count == 2
-
- self.qmf_session.delBroker(self.qmf_broker)
-
- def test_reroute(self):
- """ Verify we can reroute messages from an acquired group.
- """
- snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
- " node: {x-declare: {arguments:" +
- " {'qpid.group_header_key':'THE-GROUP'," +
- "'qpid.shared_msg_group':1}}}}")
-
- groups = ["A","B","A","B","C","A"]
- messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
- index = 0
- for m in messages:
- m.content['index'] = index
- index += 1
- snd.send(m)
-
- # create a topic exchange for the reroute
- rcvr = self.ssn.receiver("reroute-q; {create: always, delete:receiver," +
- " node: {type: topic}}")
-
- # acquire group "A"
- s1 = self.setup_session()
- c1 = s1.receiver("msg-group-q", options={"capacity":0})
- m1 = c1.fetch(0)
- assert m1.properties['THE-GROUP'] == 'A'
- assert m1.content['index'] == 0
-
- # now setup a QMF session, so we can reroute group A
- self.qmf_session = qmf.console.Session()
- self.qmf_broker = self.qmf_session.addBroker(str(self.broker))
- queue = self.qmf_session.getObjects(_class="queue", name="msg-group-q")[0]
- assert queue
- msg_filter = { 'filter_type' : 'header_match_str',
- 'filter_params' : { 'header_key' : "THE-GROUP",
- 'header_value' : "A" }}
- assert queue.msgDepth == 6
- rc = queue.reroute(0, False, "reroute-q", msg_filter)
- assert rc.status == 0
- queue.update()
- queue.msgDepth == 4 # the pending acquired A still counts!
-
- # verify all other A's removed....
- s2 = self.setup_session()
- b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":0})
- count = 0
- try:
- while True:
- m2 = b1.fetch(0)
- assert m2.properties['THE-GROUP'] != 'A'
- count += 1
- except Empty:
- pass
- assert count == 3 # only 3 really available
-
- # and what of reroute-q?
- count = 0
- try:
- while True:
- m2 = rcvr.fetch(0)
- assert m2.properties['THE-GROUP'] == 'A'
- assert m2.content['index'] == 2 or m2.content['index'] == 5
- count += 1
- except Empty:
- pass
- assert count == 2
-
- s1.acknowledge() # ack the consumed A-0
- self.qmf_session.delBroker(self.qmf_broker)
-
- def test_queue_delete(self):
- """ Test deleting a queue while consumers are active.
- """
-
- ## Create a msg group queue
-
- snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
- " node: {x-declare: {arguments:" +
- " {'qpid.group_header_key':'THE-GROUP'," +
- "'qpid.shared_msg_group':1}}}}")
-
- groups = ["A","B","A","B","C"]
- messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
- index = 0
- for m in messages:
- m.content['index'] = index
- index += 1
- snd.send(m)
-
- ## Queue = A-0, B-1, A-2, b-3, C-4
- ## Owners= ---, ---, ---, ---, ---
-
- # create consumers
- s1 = self.setup_session()
- c1 = s1.receiver("msg-group-q", options={"capacity":0})
- s2 = self.setup_session()
- c2 = s2.receiver("msg-group-q", options={"capacity":0})
-
- # C1 should acquire A-0
- m1 = c1.fetch(0);
- assert m1.properties['THE-GROUP'] == 'A'
- assert m1.content['index'] == 0
-
- # c2 acquires B-1
- m2 = c2.fetch(0)
- assert m2.properties['THE-GROUP'] == 'B'
- assert m2.content['index'] == 1
-
- # with group A and B owned, and C free, delete the
- # queue
- snd.close()
- self.ssn.close()
-
- def test_default_group_id(self):
- """ Verify the queue assigns the default group id should a message
- arrive without a group identifier.
- """
- snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
- " node: {x-declare: {arguments:" +
- " {'qpid.group_header_key':'THE-GROUP'," +
- "'qpid.shared_msg_group':1}}}}")
-
- m = Message(content={}, properties={"NO-GROUP-HEADER":"HA-HA"})
- snd.send(m)
-
- # now setup a QMF session, so we can call methods
- self.qmf_session = qmf.console.Session()
- self.qmf_broker = self.qmf_session.addBroker(str(self.broker))
- brokers = self.qmf_session.getObjects(_class="broker")
- assert len(brokers) == 1
- broker = brokers[0]
-
- # grab the group state off the queue, and verify the default group is
- # present ("qpid.no-group" is the broker default)
- rc = broker.query("queue", "msg-group-q")
- assert rc.status == 0
- assert rc.text == "OK"
- results = rc.outArgs['results']
- assert 'qpid.message_group_queue' in results
- q_info = results['qpid.message_group_queue']
- assert 'group_header_key' in q_info and q_info['group_header_key'] == "THE-GROUP"
- assert 'group_state' in q_info and len(q_info['group_state']) == 1
- g_info = q_info['group_state'][0]
- assert 'group_id' in g_info
- assert g_info['group_id'] == 'qpid.no-group'
-
- self.qmf_session.delBroker(self.qmf_broker)
-
-
-class StickyConsumerMsgGroupTests(Base):
- """
- Tests for the behavior of sticky-consumer message groups. These tests
- expect all messages from the same group be consumed by the same clients.
- See QPID-3347 for details.
- """
- pass # TBD