diff options
author | Rafael H. Schloming <rhs@apache.org> | 2008-05-09 18:40:13 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2008-05-09 18:40:13 +0000 |
commit | a91fe82ffdb32f0db050c8c071379281295e5ca8 (patch) | |
tree | 719c86b1b448ee9d7df40cd24ce1c4f9fe2b366b /qpid/python/tests/connection.py | |
parent | 485022ac7cd72b40cb4c99f2e27389d016a31371 (diff) | |
download | qpid-python-a91fe82ffdb32f0db050c8c071379281295e5ca8.tar.gz |
QPID-1045: always notify incoming message queues of session closure and provide API for notifying listeners of closure; also preserve connection close code and report in errors
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@654907 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/python/tests/connection.py')
-rw-r--r-- | qpid/python/tests/connection.py | 68 |
1 files changed, 66 insertions, 2 deletions
diff --git a/qpid/python/tests/connection.py b/qpid/python/tests/connection.py index 6925480ed3..88620bc1c6 100644 --- a/qpid/python/tests/connection.py +++ b/qpid/python/tests/connection.py @@ -51,8 +51,16 @@ class TestSession(Delegate): def queue_query(self, qq): return qq._type.result.type.new((qq.queue,), {}) - def message_transfer(self, cmd, header, body): - self.queue.put((cmd, header, body)) + def message_transfer(self, cmd, headers, body): + if cmd.destination == "echo": + m = Message(body) + m.headers = headers + self.session.message_transfer(cmd.destination, cmd.accept_mode, + cmd.acquire_mode, m) + elif cmd.destination == "abort": + self.session.channel.connection.sock.close() + else: + self.queue.put((cmd, headers, body)) class ConnectionTest(TestCase): @@ -134,3 +142,59 @@ class ConnectionTest(TestCase): qq = ssn.queue_query("asdf") assert qq.queue == "asdf" c.close(5) + + def testCloseGet(self): + c = Connection(connect("0.0.0.0", PORT), self.spec) + c.start(10) + ssn = c.session("test", timeout=10) + echos = ssn.incoming("echo") + + for i in range(10): + ssn.message_transfer("echo", message=Message("test%d" % i)) + + ssn.auto_sync=False + ssn.message_transfer("abort") + + for i in range(10): + m = echos.get(timeout=10) + assert m.body == "test%d" % i + + try: + m = echos.get(timeout=10) + assert False + except Closed, e: + pass + + def testCloseListen(self): + c = Connection(connect("0.0.0.0", PORT), self.spec) + c.start(10) + ssn = c.session("test", timeout=10) + echos = ssn.incoming("echo") + + messages = [] + exceptions = [] + condition = Condition() + def listener(m): messages.append(m) + def exc_listener(e): + exceptions.append(e) + condition.acquire() + condition.notify() + condition.release() + + echos.listen(listener, exc_listener) + + for i in range(10): + ssn.message_transfer("echo", message=Message("test%d" % i)) + + ssn.auto_sync=False + ssn.message_transfer("abort") + + condition.acquire() + condition.wait(10) + condition.release() + + for i in range(10): + m = messages.pop(0) + assert m.body == "test%d" % i + + assert len(exceptions) == 1 |