summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/tests_0-10/message.py138
1 files changed, 138 insertions, 0 deletions
diff --git a/python/tests_0-10/message.py b/python/tests_0-10/message.py
index 74e2b6416f..b882cd5438 100644
--- a/python/tests_0-10/message.py
+++ b/python/tests_0-10/message.py
@@ -645,7 +645,145 @@ class MessageTests(TestBase):
self.assertDataEquals(channel, self.client.queue("consumer").get(timeout = 1), "abcdefghijklmnopqrstuvwxyz")
self.assertEmpty(self.client.queue("consumer"))
+
+ def test_credit_flow_messages(self):
+ """
+ Test basic credit based flow control with unit = message
+ """
+ #declare an exclusive queue
+ channel = self.channel
+ channel.queue_declare(queue = "q", exclusive=True)
+ #create consumer (for now that defaults to infinite credit)
+ channel.message_consume(queue = "q", destination = "c")
+ channel.message_flow_mode(mode = 0, destination = "c")
+ #set credit to zero (can remove this once move to proper default for subscribe method)
+ channel.message_stop(destination = "c")
+ #send batch of messages to queue
+ for i in range(1, 11):
+ channel.message_transfer(routing_key = "q", body = "Message %d" % i)
+ #set message credit to finite amount (less than enough for all messages)
+ channel.message_flow(unit = 0, value = 5, destination = "c")
+ #set infinite byte credit
+ channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "c")
+ #check that expected number were received
+ q = self.client.queue("c")
+ for i in range(1, 6):
+ self.assertDataEquals(channel, q.get(timeout = 1), "Message %d" % i)
+ self.assertEmpty(q)
+
+ #increase credit again and check more are received
+ for i in range(6, 11):
+ channel.message_flow(unit = 0, value = 1, destination = "c")
+ self.assertDataEquals(channel, q.get(timeout = 1), "Message %d" % i)
+ self.assertEmpty(q)
+
+ def test_credit_flow_bytes(self):
+ """
+ Test basic credit based flow control with unit = bytes
+ """
+ #declare an exclusive queue
+ channel = self.channel
+ channel.queue_declare(queue = "q", exclusive=True)
+ #create consumer (for now that defaults to infinite credit)
+ channel.message_consume(queue = "q", destination = "c")
+ channel.message_flow_mode(mode = 0, destination = "c")
+ #set credit to zero (can remove this once move to proper default for subscribe method)
+ channel.message_stop(destination = "c")
+ #send batch of messages to queue
+ for i in range(1, 11):
+ channel.message_transfer(routing_key = "q", body = "abcdefgh")
+
+ #each message is currently interpreted as requiring 75 bytes of credit
+ #set byte credit to finite amount (less than enough for all messages)
+ channel.message_flow(unit = 1, value = 75*5, destination = "c")
+ #set infinite message credit
+ channel.message_flow(unit = 0, value = 0xFFFFFFFF, destination = "c")
+ #check that expected number were received
+ q = self.client.queue("c")
+ for i in range(1, 6):
+ self.assertDataEquals(channel, q.get(timeout = 1), "abcdefgh")
+ self.assertEmpty(q)
+
+ #increase credit again and check more are received
+ for i in range(6, 11):
+ channel.message_flow(unit = 1, value = 75, destination = "c")
+ self.assertDataEquals(channel, q.get(timeout = 1), "abcdefgh")
+ self.assertEmpty(q)
+
+
+ def test_window_flow_messages(self):
+ """
+ Test basic window based flow control with unit = message
+ """
+ #declare an exclusive queue
+ channel = self.channel
+ channel.queue_declare(queue = "q", exclusive=True)
+ #create consumer (for now that defaults to infinite credit)
+ channel.message_consume(queue = "q", destination = "c")
+ channel.message_flow_mode(mode = 1, destination = "c")
+ #set credit to zero (can remove this once move to proper default for subscribe method)
+ channel.message_stop(destination = "c")
+ #send batch of messages to queue
+ for i in range(1, 11):
+ channel.message_transfer(routing_key = "q", body = "Message %d" % i)
+
+ #set message credit to finite amount (less than enough for all messages)
+ channel.message_flow(unit = 0, value = 5, destination = "c")
+ #set infinite byte credit
+ channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "c")
+ #check that expected number were received
+ q = self.client.queue("c")
+ for i in range(1, 6):
+ msg = q.get(timeout = 1)
+ self.assertDataEquals(channel, msg, "Message %d" % i)
+ self.assertEmpty(q)
+
+ #acknowledge messages and check more are received
+ msg.complete(cumulative=True)
+ for i in range(6, 11):
+ self.assertDataEquals(channel, q.get(timeout = 1), "Message %d" % i)
+ self.assertEmpty(q)
+
+
+ def test_window_flow_bytes(self):
+ """
+ Test basic window based flow control with unit = bytes
+ """
+ #declare an exclusive queue
+ channel = self.channel
+ channel.queue_declare(queue = "q", exclusive=True)
+ #create consumer (for now that defaults to infinite credit)
+ channel.message_consume(queue = "q", destination = "c")
+ channel.message_flow_mode(mode = 1, destination = "c")
+ #set credit to zero (can remove this once move to proper default for subscribe method)
+ channel.message_stop(destination = "c")
+ #send batch of messages to queue
+ for i in range(1, 11):
+ channel.message_transfer(routing_key = "q", body = "abcdefgh")
+
+ #each message is currently interpreted as requiring 75 bytes of credit
+ #set byte credit to finite amount (less than enough for all messages)
+ channel.message_flow(unit = 1, value = 75*5, destination = "c")
+ #set infinite message credit
+ channel.message_flow(unit = 0, value = 0xFFFFFFFF, destination = "c")
+ #check that expected number were received
+ q = self.client.queue("c")
+ msgs = []
+ for i in range(1, 6):
+ msg = q.get(timeout = 1)
+ msgs.append(msg)
+ self.assertDataEquals(channel, msg, "abcdefgh")
+ self.assertEmpty(q)
+
+ #ack each message individually and check more are received
+ for i in range(6, 11):
+ msg = msgs.pop()
+ msg.complete(cumulative=False)
+ self.assertDataEquals(channel, q.get(timeout = 1), "abcdefgh")
+ self.assertEmpty(q)
+
+
def assertDataEquals(self, channel, msg, expected):
if isinstance(msg.body, ReferenceId):