summaryrefslogtreecommitdiff
path: root/qpid/python/tests/connection.py
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2008-05-09 18:40:13 +0000
committerRafael H. Schloming <rhs@apache.org>2008-05-09 18:40:13 +0000
commita91fe82ffdb32f0db050c8c071379281295e5ca8 (patch)
tree719c86b1b448ee9d7df40cd24ce1c4f9fe2b366b /qpid/python/tests/connection.py
parent485022ac7cd72b40cb4c99f2e27389d016a31371 (diff)
downloadqpid-python-a91fe82ffdb32f0db050c8c071379281295e5ca8.tar.gz
QPID-1045: always notify incoming message queues of session closure and provide API for notifying listeners of closure; also preserve connection close code and report in errors
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@654907 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/python/tests/connection.py')
-rw-r--r--qpid/python/tests/connection.py68
1 files changed, 66 insertions, 2 deletions
diff --git a/qpid/python/tests/connection.py b/qpid/python/tests/connection.py
index 6925480ed3..88620bc1c6 100644
--- a/qpid/python/tests/connection.py
+++ b/qpid/python/tests/connection.py
@@ -51,8 +51,16 @@ class TestSession(Delegate):
def queue_query(self, qq):
return qq._type.result.type.new((qq.queue,), {})
- def message_transfer(self, cmd, header, body):
- self.queue.put((cmd, header, body))
+ def message_transfer(self, cmd, headers, body):
+ if cmd.destination == "echo":
+ m = Message(body)
+ m.headers = headers
+ self.session.message_transfer(cmd.destination, cmd.accept_mode,
+ cmd.acquire_mode, m)
+ elif cmd.destination == "abort":
+ self.session.channel.connection.sock.close()
+ else:
+ self.queue.put((cmd, headers, body))
class ConnectionTest(TestCase):
@@ -134,3 +142,59 @@ class ConnectionTest(TestCase):
qq = ssn.queue_query("asdf")
assert qq.queue == "asdf"
c.close(5)
+
+ def testCloseGet(self):
+ c = Connection(connect("0.0.0.0", PORT), self.spec)
+ c.start(10)
+ ssn = c.session("test", timeout=10)
+ echos = ssn.incoming("echo")
+
+ for i in range(10):
+ ssn.message_transfer("echo", message=Message("test%d" % i))
+
+ ssn.auto_sync=False
+ ssn.message_transfer("abort")
+
+ for i in range(10):
+ m = echos.get(timeout=10)
+ assert m.body == "test%d" % i
+
+ try:
+ m = echos.get(timeout=10)
+ assert False
+ except Closed, e:
+ pass
+
+ def testCloseListen(self):
+ c = Connection(connect("0.0.0.0", PORT), self.spec)
+ c.start(10)
+ ssn = c.session("test", timeout=10)
+ echos = ssn.incoming("echo")
+
+ messages = []
+ exceptions = []
+ condition = Condition()
+ def listener(m): messages.append(m)
+ def exc_listener(e):
+ exceptions.append(e)
+ condition.acquire()
+ condition.notify()
+ condition.release()
+
+ echos.listen(listener, exc_listener)
+
+ for i in range(10):
+ ssn.message_transfer("echo", message=Message("test%d" % i))
+
+ ssn.auto_sync=False
+ ssn.message_transfer("abort")
+
+ condition.acquire()
+ condition.wait(10)
+ condition.release()
+
+ for i in range(10):
+ m = messages.pop(0)
+ assert m.body == "test%d" % i
+
+ assert len(exceptions) == 1