summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2011-09-15 17:03:11 +0000
committerGordon Sim <gsim@apache.org>2011-09-15 17:03:11 +0000
commitfaf6b5dc915c07a37c2fdaef2a758eb4656cec04 (patch)
treec7f4f2e379dc34b4a56c3b7e3442cfa1724049ac
parent852080fb799f0ac807b9553da31a4c5e2f475887 (diff)
downloadqpid-python-faf6b5dc915c07a37c2fdaef2a758eb4656cec04.tar.gz
QPID-3488: Added test case
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1171175 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--tests/src/py/qpid_tests/broker_0_10/message.py41
1 files changed, 41 insertions, 0 deletions
diff --git a/tests/src/py/qpid_tests/broker_0_10/message.py b/tests/src/py/qpid_tests/broker_0_10/message.py
index b46c446833..89ba936b05 100644
--- a/tests/src/py/qpid_tests/broker_0_10/message.py
+++ b/tests/src/py/qpid_tests/broker_0_10/message.py
@@ -508,6 +508,47 @@ class MessageTests(TestBase010):
msgB = q.get(timeout=10)
+ def test_window_stop(self):
+ """
+ Ensure window based flow control reacts to stop correctly
+ """
+ session = self.session
+ #setup subscriber on a test queue
+ session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+ session.message_subscribe(queue = "q", destination = "c")
+ session.message_set_flow_mode(flow_mode = 1, destination = "c")
+ session.message_flow(unit = session.credit_unit.message, value = 5, destination = "c")
+ session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "c")
+
+
+ #send batch of messages to queue
+ for i in range(0, 10):
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %d" % (i+1)))
+
+ #retrieve all delivered messages
+ q = session.incoming("c")
+ for i in range(0, 5):
+ msg = q.get(timeout = 1)
+ session.receiver._completed.add(msg.id)#TODO: this may be done automatically
+ self.assertDataEquals(session, msg, "Message %d" % (i+1))
+
+ session.message_stop(destination = "c")
+
+ #now send completions, normally used to move window forward,
+ #but after a stop should not do so
+ session.channel.session_completed(session.receiver._completed)
+
+ #check no more messages are sent
+ self.assertEmpty(q)
+
+ #re-establish window and check remaining messages
+ session.message_flow(unit = session.credit_unit.message, value = 5, destination = "c")
+ session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "c")
+ for i in range(0, 5):
+ msg = q.get(timeout = 1)
+ self.assertDataEquals(session, msg, "Message %d" % (i+6))
+
+
def test_subscribe_not_acquired(self):
"""
Test the not-acquired modes works as expected for a simple case