diff options
-rw-r--r-- | tests/src/py/qpid_tests/broker_0_10/msg_groups.py | 156 |
1 files changed, 126 insertions, 30 deletions
diff --git a/tests/src/py/qpid_tests/broker_0_10/msg_groups.py b/tests/src/py/qpid_tests/broker_0_10/msg_groups.py index 611be0a6b0..99d11151e8 100644 --- a/tests/src/py/qpid_tests/broker_0_10/msg_groups.py +++ b/tests/src/py/qpid_tests/broker_0_10/msg_groups.py @@ -64,9 +64,9 @@ class MultiConsumerMsgGroupTests(Base): # create consumers on separate sessions: C1,C2 s1 = self.setup_session() - c1 = s1.receiver("msg-group-q", options={"capacity":1}) + c1 = s1.receiver("msg-group-q", options={"capacity":0}) s2 = self.setup_session() - c2 = s2.receiver("msg-group-q", options={"capacity":1}) + c2 = s2.receiver("msg-group-q", options={"capacity":0}) # C1 should acquire A-0, then C2 should acquire B-3 @@ -189,9 +189,9 @@ class MultiConsumerMsgGroupTests(Base): # create consumer and browser s1 = self.setup_session() - c1 = s1.receiver("msg-group-q", options={"capacity":1}) + c1 = s1.receiver("msg-group-q", options={"capacity":0}) s2 = self.setup_session() - b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":1}) + b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":0}) # C1 should acquire A-0 @@ -242,9 +242,9 @@ class MultiConsumerMsgGroupTests(Base): snd.send(m) s1 = self.setup_session() - c1 = s1.receiver("msg-group-q", options={"capacity":1}) + c1 = s1.receiver("msg-group-q", options={"capacity":0}) s2 = self.setup_session() - c2 = s2.receiver("msg-group-q", options={"capacity":1}) + c2 = s2.receiver("msg-group-q", options={"capacity":0}) m1 = c1.fetch(0) assert m1.properties['THE-GROUP'] == 'A' @@ -280,9 +280,9 @@ class MultiConsumerMsgGroupTests(Base): snd.send(m) s1 = self.setup_session() - c1 = s1.receiver("msg-group-q", options={"capacity":1}) + c1 = s1.receiver("msg-group-q", options={"capacity":0}) s2 = self.setup_session() - c2 = s2.receiver("msg-group-q", options={"capacity":1}) + c2 = s2.receiver("msg-group-q", options={"capacity":0}) m1 = c1.fetch(0) assert m1.properties['THE-GROUP'] == 'A' @@ -317,9 +317,9 @@ class MultiConsumerMsgGroupTests(Base): snd.send(m) s1 = self.setup_session() - c1 = s1.receiver("msg-group-q", options={"capacity":1}) + c1 = s1.receiver("msg-group-q", options={"capacity":0}) s2 = self.setup_session() - c2 = s2.receiver("msg-group-q", options={"capacity":1}) + c2 = s2.receiver("msg-group-q", options={"capacity":0}) # C1 will own group A m1 = c1.fetch(0) @@ -383,9 +383,9 @@ class MultiConsumerMsgGroupTests(Base): snd.send(m) s1 = self.conn.session(transactional=True) - c1 = s1.receiver("msg-group-q", options={"capacity":1}) + c1 = s1.receiver("msg-group-q", options={"capacity":0}) s2 = self.conn.session(transactional=True) - c2 = s2.receiver("msg-group-q", options={"capacity":1}) + c2 = s2.receiver("msg-group-q", options={"capacity":0}) # C1 gets group A m1 = c1.fetch(0) @@ -400,8 +400,8 @@ class MultiConsumerMsgGroupTests(Base): s1.acknowledge(m1) # A-0 consumed, A group freed s2.acknowledge(m2) # B-2 consumed, B group freed - s1.commit() - s2.rollback() # release B-2 and group B + s1.commit() # A-0 consumption done, A group now free + s2.rollback() # releases B-2, and group B ## Q: ["A1","B2","B3","A4","B5"] @@ -477,6 +477,102 @@ class MultiConsumerMsgGroupTests(Base): s2.acknowledge() s2.commit() + def test_send_transaction(self): + """ Verify behavior when sender is using transactions. + """ + ssn = self.conn.session(transactional=True) + snd = ssn.sender("msg-group-q; {create:always, delete:sender," + + " node: {x-declare: {arguments:" + + " {'qpid.group_header_key':'THE-GROUP'," + + "'qpid.shared_msg_group':1}}}}") + + msg = Message(content={'index':0}, properties={"THE-GROUP": "A"}) + snd.send(msg) + msg = Message(content={'index':1}, properties={"THE-GROUP": "B"}) + snd.send(msg) + snd.session.commit() + msg = Message(content={'index':2}, properties={"THE-GROUP": "A"}) + snd.send(msg) + + # Queue: [A0,B1, (uncommitted: A2) ] + + s1 = self.conn.session(transactional=True) + c1 = s1.receiver("msg-group-q", options={"capacity":0}) + s2 = self.conn.session(transactional=True) + c2 = s2.receiver("msg-group-q", options={"capacity":0}) + + # C1 gets A0, group A + m1 = c1.fetch(0) + assert m1.properties['THE-GROUP'] == 'A' + assert m1.content['index'] == 0 + + # C2 gets B2, group B + m2 = c2.fetch(0) + assert m2.properties['THE-GROUP'] == 'B' + assert m2.content['index'] == 1 + + # Since A2 uncommitted, there should be nothing left to fetch + try: + mX = c1.fetch(0) + assert False # should not get here + except Empty: + pass + try: + mX = c2.fetch(0) + assert False # should not get here + except Empty: + pass + + snd.session.commit() + msg = Message(content={'index':3}, properties={"THE-GROUP": "B"}) + snd.send(msg) + + # Queue: [A2, (uncommitted: B3) ] + + # B3 has yet to be committed, so C2 should see nothing available: + try: + mX = c2.fetch(0) + assert False # should not get here + except Empty: + pass + + # but A2 should be available to C1 + m3 = c1.fetch(0) + assert m3.properties['THE-GROUP'] == 'A' + assert m3.content['index'] == 2 + + # now make B3 available + snd.session.commit() + + # C1 should still be done: + try: + mX = c1.fetch(0) + assert False # should not get here + except Empty: + pass + + # but C2 should find the new B + m4 = c2.fetch(0) + assert m4.properties['THE-GROUP'] == 'B' + assert m4.content['index'] == 3 + + # extra: have C1 rollback, verify C2 finds the released 'A' messages + c1.session.rollback() + + ## Q: ["A0","A2"] + + # C2 should be able to get the next A + m5 = c2.fetch(0) + assert m5.properties['THE-GROUP'] == 'A' + assert m5.content['index'] == 0 + + m6 = c2.fetch(0) + assert m6.properties['THE-GROUP'] == 'A' + assert m6.content['index'] == 2 + + c2.session.acknowledge() + c2.session.commit() + def test_query(self): """ Verify the queue query method against message groups """ @@ -494,9 +590,9 @@ class MultiConsumerMsgGroupTests(Base): snd.send(m) s1 = self.setup_session() - c1 = s1.receiver("msg-group-q", options={"capacity":1}) + c1 = s1.receiver("msg-group-q", options={"capacity":0}) s2 = self.setup_session() - c2 = s2.receiver("msg-group-q", options={"capacity":1}) + c2 = s2.receiver("msg-group-q", options={"capacity":0}) m1 = c1.fetch(0) m2 = c2.fetch(0) @@ -568,7 +664,7 @@ class MultiConsumerMsgGroupTests(Base): # verify all B's removed.... s2 = self.setup_session() - b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":1}) + b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":0}) count = 0 try: while True: @@ -599,7 +695,7 @@ class MultiConsumerMsgGroupTests(Base): # acquire group "A" s1 = self.setup_session() - c1 = s1.receiver("msg-group-q", options={"capacity":1}) + c1 = s1.receiver("msg-group-q", options={"capacity":0}) m1 = c1.fetch(0) assert m1.properties['THE-GROUP'] == 'A' assert m1.content['index'] == 0 @@ -620,7 +716,7 @@ class MultiConsumerMsgGroupTests(Base): # verify all other A's removed.... s2 = self.setup_session() - b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":1}) + b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":0}) count = 0 try: while True: @@ -652,7 +748,7 @@ class MultiConsumerMsgGroupTests(Base): # acquire group "A" s1 = self.setup_session() - c1 = s1.receiver("msg-group-q", options={"capacity":1}) + c1 = s1.receiver("msg-group-q", options={"capacity":0}) m1 = c1.fetch(0) assert m1.properties['THE-GROUP'] == 'A' assert m1.content['index'] == 0 @@ -673,7 +769,7 @@ class MultiConsumerMsgGroupTests(Base): # verify all other A's removed.... s2 = self.setup_session() - b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":1}) + b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":0}) count = 0 a_count = 0 try: @@ -714,7 +810,7 @@ class MultiConsumerMsgGroupTests(Base): # acquire group "A" s1 = self.setup_session() - c1 = s1.receiver("msg-group-q", options={"capacity":1}) + c1 = s1.receiver("msg-group-q", options={"capacity":0}) m1 = c1.fetch(0) assert m1.properties['THE-GROUP'] == 'A' assert m1.content['index'] == 0 @@ -733,7 +829,7 @@ class MultiConsumerMsgGroupTests(Base): # verify all other A's removed from msg-group-q s2 = self.setup_session() - b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":1}) + b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":0}) count = 0 try: while True: @@ -746,7 +842,7 @@ class MultiConsumerMsgGroupTests(Base): # verify the moved A's are at the dest-q s2 = self.setup_session() - b1 = s2.receiver("dest-q; {mode: browse}", options={"capacity":1}) + b1 = s2.receiver("dest-q; {mode: browse}", options={"capacity":0}) count = 0 try: while True: @@ -797,7 +893,7 @@ class MultiConsumerMsgGroupTests(Base): # verify all B's removed from msg-group-q s2 = self.setup_session() - b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":1}) + b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":0}) count = 0 try: while True: @@ -810,7 +906,7 @@ class MultiConsumerMsgGroupTests(Base): # verify the moved B's are at the dest-q s2 = self.setup_session() - b1 = s2.receiver("dest-q; {mode: browse}", options={"capacity":1}) + b1 = s2.receiver("dest-q; {mode: browse}", options={"capacity":0}) count = 0 try: while True: @@ -846,7 +942,7 @@ class MultiConsumerMsgGroupTests(Base): # acquire group "A" s1 = self.setup_session() - c1 = s1.receiver("msg-group-q", options={"capacity":1}) + c1 = s1.receiver("msg-group-q", options={"capacity":0}) m1 = c1.fetch(0) assert m1.properties['THE-GROUP'] == 'A' assert m1.content['index'] == 0 @@ -867,7 +963,7 @@ class MultiConsumerMsgGroupTests(Base): # verify all other A's removed.... s2 = self.setup_session() - b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":1}) + b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":0}) count = 0 try: while True: @@ -917,9 +1013,9 @@ class MultiConsumerMsgGroupTests(Base): # create consumers s1 = self.setup_session() - c1 = s1.receiver("msg-group-q", options={"capacity":1}) + c1 = s1.receiver("msg-group-q", options={"capacity":0}) s2 = self.setup_session() - c2 = s2.receiver("msg-group-q", options={"capacity":1}) + c2 = s2.receiver("msg-group-q", options={"capacity":0}) # C1 should acquire A-0 m1 = c1.fetch(0); |