summaryrefslogtreecommitdiff
path: root/qpid/python/tests_0-10/message.py
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/python/tests_0-10/message.py')
-rw-r--r--qpid/python/tests_0-10/message.py112
1 files changed, 74 insertions, 38 deletions
diff --git a/qpid/python/tests_0-10/message.py b/qpid/python/tests_0-10/message.py
index cbcef5602f..f80eca6363 100644
--- a/qpid/python/tests_0-10/message.py
+++ b/qpid/python/tests_0-10/message.py
@@ -230,8 +230,8 @@ class MessageTests(TestBase010):
session.message_subscribe(destination="my-consumer", queue="test-queue-4")
myqueue = session.incoming("my-consumer")
- session.message_flow(destination="my-consumer", unit=session.credit_unit.message, value=0xFFFFFFFF)
- session.message_flow(destination="my-consumer", unit=session.credit_unit.byte, value=0xFFFFFFFF)
+ session.message_flow(destination="my-consumer", unit=session.credit_unit.message, value=0xFFFFFFFFL)
+ session.message_flow(destination="my-consumer", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
#should flush here
@@ -258,8 +258,8 @@ class MessageTests(TestBase010):
session.queue_declare(queue="test-ack-queue", auto_delete=True)
session.message_subscribe(queue = "test-ack-queue", destination = "consumer")
- session.message_flow(destination="consumer", unit=session.credit_unit.message, value=0xFFFFFFFF)
- session.message_flow(destination="consumer", unit=session.credit_unit.byte, value=0xFFFFFFFF)
+ session.message_flow(destination="consumer", unit=session.credit_unit.message, value=0xFFFFFFFFL)
+ session.message_flow(destination="consumer", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
queue = session.incoming("consumer")
delivery_properties = session.delivery_properties(routing_key="test-ack-queue")
@@ -289,8 +289,8 @@ class MessageTests(TestBase010):
session.close(timeout=10)
session = self.session
- session.message_flow(destination="checker", unit=session.credit_unit.message, value=0xFFFFFFFF)
- session.message_flow(destination="checker", unit=session.credit_unit.byte, value=0xFFFFFFFF)
+ session.message_flow(destination="checker", unit=session.credit_unit.message, value=0xFFFFFFFFL)
+ session.message_flow(destination="checker", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
queue = session.incoming("checker")
msg3b = queue.get(timeout=1)
@@ -311,16 +311,16 @@ class MessageTests(TestBase010):
session.exchange_bind(queue = "r", exchange = "amq.fanout")
session.message_subscribe(queue = "q", destination = "consumer")
- session.message_flow(destination="consumer", unit=session.credit_unit.message, value=0xFFFFFFFF)
- session.message_flow(destination="consumer", unit=session.credit_unit.byte, value=0xFFFFFFFF)
+ session.message_flow(destination="consumer", unit=session.credit_unit.message, value=0xFFFFFFFFL)
+ session.message_flow(destination="consumer", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "blah, blah"))
msg = session.incoming("consumer").get(timeout = 1)
self.assertEquals(msg.body, "blah, blah")
session.message_reject(RangedSet(msg.id))
session.message_subscribe(queue = "r", destination = "checker")
- session.message_flow(destination="checker", unit=session.credit_unit.message, value=0xFFFFFFFF)
- session.message_flow(destination="checker", unit=session.credit_unit.byte, value=0xFFFFFFFF)
+ session.message_flow(destination="checker", unit=session.credit_unit.message, value=0xFFFFFFFFL)
+ session.message_flow(destination="checker", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
msg = session.incoming("checker").get(timeout = 1)
self.assertEquals(msg.body, "blah, blah")
@@ -341,7 +341,7 @@ class MessageTests(TestBase010):
#set message credit to finite amount (less than enough for all messages)
session.message_flow(unit = session.credit_unit.message, value = 5, destination = "c")
#set infinite byte credit
- session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFF, destination = "c")
+ session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "c")
#check that expected number were received
q = session.incoming("c")
for i in range(1, 6):
@@ -374,7 +374,7 @@ class MessageTests(TestBase010):
#set byte credit to finite amount (less than enough for all messages)
session.message_flow(unit = session.credit_unit.byte, value = msg_size*5, destination = "c")
#set infinite message credit
- session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFF, destination = "c")
+ session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFFL, destination = "c")
#check that expected number were received
q = session.incoming("c")
for i in range(5):
@@ -405,7 +405,7 @@ class MessageTests(TestBase010):
#set message credit to finite amount (less than enough for all messages)
session.message_flow(unit = session.credit_unit.message, value = 5, destination = "c")
#set infinite byte credit
- session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFF, destination = "c")
+ session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "c")
#check that expected number were received
q = session.incoming("c")
for i in range(1, 6):
@@ -443,7 +443,7 @@ class MessageTests(TestBase010):
#set byte credit to finite amount (less than enough for all messages)
session.message_flow(unit = session.credit_unit.byte, value = msg_size*5, destination = "c")
#set infinite message credit
- session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFF, destination = "c")
+ session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFFL, destination = "c")
#check that expected number were received
q = session.incoming("c")
msgs = []
@@ -462,6 +462,42 @@ class MessageTests(TestBase010):
self.assertDataEquals(session, q.get(timeout = 1), "abcdefgh")
self.assertEmpty(q)
+ def test_window_flush_ack_flow(self):
+ """
+ Test basic window based flow control with unit = bytes
+ """
+ #declare an exclusive queue
+ ssn = self.session
+ ssn.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+ #create consumer
+ ssn.message_subscribe(queue = "q", destination = "c",
+ accept_mode=ssn.accept_mode.explicit)
+ ssn.message_set_flow_mode(flow_mode = ssn.flow_mode.window, destination = "c")
+
+ #send message A
+ ssn.message_transfer(message=Message(ssn.delivery_properties(routing_key="q"), "A"))
+
+ for unit in ssn.credit_unit.values():
+ ssn.message_flow("c", unit, 0xFFFFFFFFL)
+
+ q = ssn.incoming("c")
+ msgA = q.get(timeout=10)
+
+ ssn.message_flush(destination="c")
+
+ # XXX
+ ssn.receiver._completed.add(msgA.id)
+ ssn.channel.session_completed(ssn.receiver._completed)
+ ssn.message_accept(RangedSet(msgA.id))
+
+ for unit in ssn.credit_unit.values():
+ ssn.message_flow("c", unit, 0xFFFFFFFFL)
+
+ #send message B
+ ssn.message_transfer(message=Message(ssn.delivery_properties(routing_key="q"), "B"))
+
+ msgB = q.get(timeout=10)
+
def test_subscribe_not_acquired(self):
"""
Test the not-acquired modes works as expected for a simple case
@@ -472,11 +508,11 @@ class MessageTests(TestBase010):
session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %s" % i))
session.message_subscribe(queue = "q", destination = "a", acquire_mode = 1)
- session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFF, destination = "a")
- session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFF, destination = "a")
+ session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFFL, destination = "a")
+ session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a")
session.message_subscribe(queue = "q", destination = "b", acquire_mode = 1)
- session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFF, destination = "b")
- session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFF, destination = "b")
+ session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFFL, destination = "b")
+ session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "b")
for i in range(6, 11):
session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %s" % i))
@@ -508,8 +544,8 @@ class MessageTests(TestBase010):
session.message_subscribe(queue = "q", destination = "a", acquire_mode = 1, accept_mode = 1)
session.message_set_flow_mode(flow_mode = session.flow_mode.credit, destination = "a")
- session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFF, destination = "a")
- session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFF, destination = "a")
+ session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFFL, destination = "a")
+ session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a")
msg = session.incoming("a").get(timeout = 1)
self.assertEquals("acquire me", msg.body)
#message should still be on the queue:
@@ -532,8 +568,8 @@ class MessageTests(TestBase010):
session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "acquire me"))
session.message_subscribe(queue = "q", destination = "a", acquire_mode = 1)
- session.message_flow(destination="a", unit=session.credit_unit.message, value=0xFFFFFFFF)
- session.message_flow(destination="a", unit=session.credit_unit.byte, value=0xFFFFFFFF)
+ session.message_flow(destination="a", unit=session.credit_unit.message, value=0xFFFFFFFFL)
+ session.message_flow(destination="a", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
msg = session.incoming("a").get(timeout = 1)
self.assertEquals("acquire me", msg.body)
#message should still be on the queue:
@@ -558,8 +594,8 @@ class MessageTests(TestBase010):
session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "release me"))
session.message_subscribe(queue = "q", destination = "a")
- session.message_flow(destination="a", unit=session.credit_unit.message, value=0xFFFFFFFF)
- session.message_flow(destination="a", unit=session.credit_unit.byte, value=0xFFFFFFFF)
+ session.message_flow(destination="a", unit=session.credit_unit.message, value=0xFFFFFFFFL)
+ session.message_flow(destination="a", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
msg = session.incoming("a").get(timeout = 1)
self.assertEquals("release me", msg.body)
session.message_cancel(destination = "a")
@@ -579,7 +615,7 @@ class MessageTests(TestBase010):
session.message_subscribe(queue = "q", destination = "a")
session.message_flow(unit = session.credit_unit.message, value = 10, destination = "a")
- session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFF, destination = "a")
+ session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a")
queue = session.incoming("a")
first = queue.get(timeout = 1)
for i in range(2, 10):
@@ -612,7 +648,7 @@ class MessageTests(TestBase010):
session.message_subscribe(queue = "q", destination = "a")
session.message_flow(unit = session.credit_unit.message, value = 10, destination = "a")
- session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFF, destination = "a")
+ session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a")
queue = session.incoming("a")
ids = []
for i in range (1, 11):
@@ -637,8 +673,8 @@ class MessageTests(TestBase010):
session.close(timeout=10)
session = self.session
- session.message_flow(destination="checker", unit=session.credit_unit.message, value=0xFFFFFFFF)
- session.message_flow(destination="checker", unit=session.credit_unit.byte, value=0xFFFFFFFF)
+ session.message_flow(destination="checker", unit=session.credit_unit.message, value=0xFFFFFFFFL)
+ session.message_flow(destination="checker", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
queue = session.incoming("checker")
self.assertEquals("message 4", queue.get(timeout = 1).body)
@@ -656,7 +692,7 @@ class MessageTests(TestBase010):
session.message_subscribe(queue = "q", destination = "a")
session.message_set_flow_mode(flow_mode = 0, destination = "a")
session.message_flow(unit = session.credit_unit.message, value = 5, destination = "a")
- session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFF, destination = "a")
+ session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a")
queue = session.incoming("a")
for i in range(1, 6):
@@ -671,7 +707,7 @@ class MessageTests(TestBase010):
#now create a not-acquired subscriber
session.message_subscribe(queue = "q", destination = "b", acquire_mode=1)
- session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFF, destination = "b")
+ session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "b")
#check it gets those not consumed
queue = session.incoming("b")
@@ -699,7 +735,7 @@ class MessageTests(TestBase010):
#create a not-acquired subscriber
session.message_subscribe(queue = "q", destination = "a", acquire_mode=1)
- session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFF, destination = "a")
+ session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a")
session.message_flow(unit = session.credit_unit.message, value = 10, destination = "a")
#browse through messages
@@ -721,7 +757,7 @@ class MessageTests(TestBase010):
#create a second not-acquired subscriber
session.message_subscribe(queue = "q", destination = "b", acquire_mode=1)
- session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFF, destination = "b")
+ session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "b")
session.message_flow(unit = session.credit_unit.message, value = 1, destination = "b")
#check it gets those not consumed
queue = session.incoming("b")
@@ -748,12 +784,12 @@ class MessageTests(TestBase010):
#create two 'browsers'
session.message_subscribe(queue = "q", destination = "a", acquire_mode=1)
- session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFF, destination = "a")
+ session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a")
session.message_flow(unit = session.credit_unit.message, value = 10, destination = "a")
queueA = session.incoming("a")
session.message_subscribe(queue = "q", destination = "b", acquire_mode=1)
- session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFF, destination = "b")
+ session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "b")
session.message_flow(unit = session.credit_unit.message, value = 10, destination = "b")
queueB = session.incoming("b")
@@ -770,7 +806,7 @@ class MessageTests(TestBase010):
#create consumer
session.message_subscribe(queue = "q", destination = "c")
- session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFF, destination = "c")
+ session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "c")
session.message_flow(unit = session.credit_unit.message, value = 10, destination = "c")
queueC = session.incoming("c")
#consume the message then ack it
@@ -787,8 +823,8 @@ class MessageTests(TestBase010):
consumer_tag = "tag1"
session.message_subscribe(queue="xyz", destination=consumer_tag)
- session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFF, destination = consumer_tag)
- session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFF, destination = consumer_tag)
+ session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFFL, destination = consumer_tag)
+ session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = consumer_tag)
queue = session.incoming(consumer_tag)
msg = queue.get(timeout=1)
self.assertEquals("", msg.body)
@@ -827,7 +863,7 @@ class MessageTests(TestBase010):
messages = session.incoming(d)
sleep(1)
session.message_flow(unit = session.credit_unit.message, value=2, destination=d)
- session.message_flow(unit = session.credit_unit.byte, value=0xFFFFFFFF, destination=d)
+ session.message_flow(unit = session.credit_unit.byte, value=0xFFFFFFFFL, destination=d)
assert messages.get(timeout=1).body == "second"
self.assertEmpty(messages)