summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2011-10-10 15:24:34 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2011-10-10 15:24:34 +0000
commit306fe6cd338055144ffb1bd979e003afadee755c (patch)
tree8c3bbe463d0992d2ceddda717411699a54e4a898
parentb953a11b3ff105c4180a852e6fcbfc4222c53606 (diff)
downloadqpid-python-306fe6cd338055144ffb1bd979e003afadee755c.tar.gz
QPID-3346: fix capacity settings in tests, add transactioned sender test
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1181019 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--tests/src/py/qpid_tests/broker_0_10/msg_groups.py156
1 files changed, 126 insertions, 30 deletions
diff --git a/tests/src/py/qpid_tests/broker_0_10/msg_groups.py b/tests/src/py/qpid_tests/broker_0_10/msg_groups.py
index 611be0a6b0..99d11151e8 100644
--- a/tests/src/py/qpid_tests/broker_0_10/msg_groups.py
+++ b/tests/src/py/qpid_tests/broker_0_10/msg_groups.py
@@ -64,9 +64,9 @@ class MultiConsumerMsgGroupTests(Base):
# create consumers on separate sessions: C1,C2
s1 = self.setup_session()
- c1 = s1.receiver("msg-group-q", options={"capacity":1})
+ c1 = s1.receiver("msg-group-q", options={"capacity":0})
s2 = self.setup_session()
- c2 = s2.receiver("msg-group-q", options={"capacity":1})
+ c2 = s2.receiver("msg-group-q", options={"capacity":0})
# C1 should acquire A-0, then C2 should acquire B-3
@@ -189,9 +189,9 @@ class MultiConsumerMsgGroupTests(Base):
# create consumer and browser
s1 = self.setup_session()
- c1 = s1.receiver("msg-group-q", options={"capacity":1})
+ c1 = s1.receiver("msg-group-q", options={"capacity":0})
s2 = self.setup_session()
- b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":1})
+ b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":0})
# C1 should acquire A-0
@@ -242,9 +242,9 @@ class MultiConsumerMsgGroupTests(Base):
snd.send(m)
s1 = self.setup_session()
- c1 = s1.receiver("msg-group-q", options={"capacity":1})
+ c1 = s1.receiver("msg-group-q", options={"capacity":0})
s2 = self.setup_session()
- c2 = s2.receiver("msg-group-q", options={"capacity":1})
+ c2 = s2.receiver("msg-group-q", options={"capacity":0})
m1 = c1.fetch(0)
assert m1.properties['THE-GROUP'] == 'A'
@@ -280,9 +280,9 @@ class MultiConsumerMsgGroupTests(Base):
snd.send(m)
s1 = self.setup_session()
- c1 = s1.receiver("msg-group-q", options={"capacity":1})
+ c1 = s1.receiver("msg-group-q", options={"capacity":0})
s2 = self.setup_session()
- c2 = s2.receiver("msg-group-q", options={"capacity":1})
+ c2 = s2.receiver("msg-group-q", options={"capacity":0})
m1 = c1.fetch(0)
assert m1.properties['THE-GROUP'] == 'A'
@@ -317,9 +317,9 @@ class MultiConsumerMsgGroupTests(Base):
snd.send(m)
s1 = self.setup_session()
- c1 = s1.receiver("msg-group-q", options={"capacity":1})
+ c1 = s1.receiver("msg-group-q", options={"capacity":0})
s2 = self.setup_session()
- c2 = s2.receiver("msg-group-q", options={"capacity":1})
+ c2 = s2.receiver("msg-group-q", options={"capacity":0})
# C1 will own group A
m1 = c1.fetch(0)
@@ -383,9 +383,9 @@ class MultiConsumerMsgGroupTests(Base):
snd.send(m)
s1 = self.conn.session(transactional=True)
- c1 = s1.receiver("msg-group-q", options={"capacity":1})
+ c1 = s1.receiver("msg-group-q", options={"capacity":0})
s2 = self.conn.session(transactional=True)
- c2 = s2.receiver("msg-group-q", options={"capacity":1})
+ c2 = s2.receiver("msg-group-q", options={"capacity":0})
# C1 gets group A
m1 = c1.fetch(0)
@@ -400,8 +400,8 @@ class MultiConsumerMsgGroupTests(Base):
s1.acknowledge(m1) # A-0 consumed, A group freed
s2.acknowledge(m2) # B-2 consumed, B group freed
- s1.commit()
- s2.rollback() # release B-2 and group B
+ s1.commit() # A-0 consumption done, A group now free
+ s2.rollback() # releases B-2, and group B
## Q: ["A1","B2","B3","A4","B5"]
@@ -477,6 +477,102 @@ class MultiConsumerMsgGroupTests(Base):
s2.acknowledge()
s2.commit()
+ def test_send_transaction(self):
+ """ Verify behavior when sender is using transactions.
+ """
+ ssn = self.conn.session(transactional=True)
+ snd = ssn.sender("msg-group-q; {create:always, delete:sender," +
+ " node: {x-declare: {arguments:" +
+ " {'qpid.group_header_key':'THE-GROUP'," +
+ "'qpid.shared_msg_group':1}}}}")
+
+ msg = Message(content={'index':0}, properties={"THE-GROUP": "A"})
+ snd.send(msg)
+ msg = Message(content={'index':1}, properties={"THE-GROUP": "B"})
+ snd.send(msg)
+ snd.session.commit()
+ msg = Message(content={'index':2}, properties={"THE-GROUP": "A"})
+ snd.send(msg)
+
+ # Queue: [A0,B1, (uncommitted: A2) ]
+
+ s1 = self.conn.session(transactional=True)
+ c1 = s1.receiver("msg-group-q", options={"capacity":0})
+ s2 = self.conn.session(transactional=True)
+ c2 = s2.receiver("msg-group-q", options={"capacity":0})
+
+ # C1 gets A0, group A
+ m1 = c1.fetch(0)
+ assert m1.properties['THE-GROUP'] == 'A'
+ assert m1.content['index'] == 0
+
+ # C2 gets B2, group B
+ m2 = c2.fetch(0)
+ assert m2.properties['THE-GROUP'] == 'B'
+ assert m2.content['index'] == 1
+
+ # Since A2 uncommitted, there should be nothing left to fetch
+ try:
+ mX = c1.fetch(0)
+ assert False # should not get here
+ except Empty:
+ pass
+ try:
+ mX = c2.fetch(0)
+ assert False # should not get here
+ except Empty:
+ pass
+
+ snd.session.commit()
+ msg = Message(content={'index':3}, properties={"THE-GROUP": "B"})
+ snd.send(msg)
+
+ # Queue: [A2, (uncommitted: B3) ]
+
+ # B3 has yet to be committed, so C2 should see nothing available:
+ try:
+ mX = c2.fetch(0)
+ assert False # should not get here
+ except Empty:
+ pass
+
+ # but A2 should be available to C1
+ m3 = c1.fetch(0)
+ assert m3.properties['THE-GROUP'] == 'A'
+ assert m3.content['index'] == 2
+
+ # now make B3 available
+ snd.session.commit()
+
+ # C1 should still be done:
+ try:
+ mX = c1.fetch(0)
+ assert False # should not get here
+ except Empty:
+ pass
+
+ # but C2 should find the new B
+ m4 = c2.fetch(0)
+ assert m4.properties['THE-GROUP'] == 'B'
+ assert m4.content['index'] == 3
+
+ # extra: have C1 rollback, verify C2 finds the released 'A' messages
+ c1.session.rollback()
+
+ ## Q: ["A0","A2"]
+
+ # C2 should be able to get the next A
+ m5 = c2.fetch(0)
+ assert m5.properties['THE-GROUP'] == 'A'
+ assert m5.content['index'] == 0
+
+ m6 = c2.fetch(0)
+ assert m6.properties['THE-GROUP'] == 'A'
+ assert m6.content['index'] == 2
+
+ c2.session.acknowledge()
+ c2.session.commit()
+
def test_query(self):
""" Verify the queue query method against message groups
"""
@@ -494,9 +590,9 @@ class MultiConsumerMsgGroupTests(Base):
snd.send(m)
s1 = self.setup_session()
- c1 = s1.receiver("msg-group-q", options={"capacity":1})
+ c1 = s1.receiver("msg-group-q", options={"capacity":0})
s2 = self.setup_session()
- c2 = s2.receiver("msg-group-q", options={"capacity":1})
+ c2 = s2.receiver("msg-group-q", options={"capacity":0})
m1 = c1.fetch(0)
m2 = c2.fetch(0)
@@ -568,7 +664,7 @@ class MultiConsumerMsgGroupTests(Base):
# verify all B's removed....
s2 = self.setup_session()
- b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":1})
+ b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":0})
count = 0
try:
while True:
@@ -599,7 +695,7 @@ class MultiConsumerMsgGroupTests(Base):
# acquire group "A"
s1 = self.setup_session()
- c1 = s1.receiver("msg-group-q", options={"capacity":1})
+ c1 = s1.receiver("msg-group-q", options={"capacity":0})
m1 = c1.fetch(0)
assert m1.properties['THE-GROUP'] == 'A'
assert m1.content['index'] == 0
@@ -620,7 +716,7 @@ class MultiConsumerMsgGroupTests(Base):
# verify all other A's removed....
s2 = self.setup_session()
- b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":1})
+ b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":0})
count = 0
try:
while True:
@@ -652,7 +748,7 @@ class MultiConsumerMsgGroupTests(Base):
# acquire group "A"
s1 = self.setup_session()
- c1 = s1.receiver("msg-group-q", options={"capacity":1})
+ c1 = s1.receiver("msg-group-q", options={"capacity":0})
m1 = c1.fetch(0)
assert m1.properties['THE-GROUP'] == 'A'
assert m1.content['index'] == 0
@@ -673,7 +769,7 @@ class MultiConsumerMsgGroupTests(Base):
# verify all other A's removed....
s2 = self.setup_session()
- b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":1})
+ b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":0})
count = 0
a_count = 0
try:
@@ -714,7 +810,7 @@ class MultiConsumerMsgGroupTests(Base):
# acquire group "A"
s1 = self.setup_session()
- c1 = s1.receiver("msg-group-q", options={"capacity":1})
+ c1 = s1.receiver("msg-group-q", options={"capacity":0})
m1 = c1.fetch(0)
assert m1.properties['THE-GROUP'] == 'A'
assert m1.content['index'] == 0
@@ -733,7 +829,7 @@ class MultiConsumerMsgGroupTests(Base):
# verify all other A's removed from msg-group-q
s2 = self.setup_session()
- b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":1})
+ b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":0})
count = 0
try:
while True:
@@ -746,7 +842,7 @@ class MultiConsumerMsgGroupTests(Base):
# verify the moved A's are at the dest-q
s2 = self.setup_session()
- b1 = s2.receiver("dest-q; {mode: browse}", options={"capacity":1})
+ b1 = s2.receiver("dest-q; {mode: browse}", options={"capacity":0})
count = 0
try:
while True:
@@ -797,7 +893,7 @@ class MultiConsumerMsgGroupTests(Base):
# verify all B's removed from msg-group-q
s2 = self.setup_session()
- b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":1})
+ b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":0})
count = 0
try:
while True:
@@ -810,7 +906,7 @@ class MultiConsumerMsgGroupTests(Base):
# verify the moved B's are at the dest-q
s2 = self.setup_session()
- b1 = s2.receiver("dest-q; {mode: browse}", options={"capacity":1})
+ b1 = s2.receiver("dest-q; {mode: browse}", options={"capacity":0})
count = 0
try:
while True:
@@ -846,7 +942,7 @@ class MultiConsumerMsgGroupTests(Base):
# acquire group "A"
s1 = self.setup_session()
- c1 = s1.receiver("msg-group-q", options={"capacity":1})
+ c1 = s1.receiver("msg-group-q", options={"capacity":0})
m1 = c1.fetch(0)
assert m1.properties['THE-GROUP'] == 'A'
assert m1.content['index'] == 0
@@ -867,7 +963,7 @@ class MultiConsumerMsgGroupTests(Base):
# verify all other A's removed....
s2 = self.setup_session()
- b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":1})
+ b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":0})
count = 0
try:
while True:
@@ -917,9 +1013,9 @@ class MultiConsumerMsgGroupTests(Base):
# create consumers
s1 = self.setup_session()
- c1 = s1.receiver("msg-group-q", options={"capacity":1})
+ c1 = s1.receiver("msg-group-q", options={"capacity":0})
s2 = self.setup_session()
- c2 = s2.receiver("msg-group-q", options={"capacity":1})
+ c2 = s2.receiver("msg-group-q", options={"capacity":0})
# C1 should acquire A-0
m1 = c1.fetch(0);