diff options
author | Rafael H. Schloming <rhs@apache.org> | 2008-05-09 18:40:13 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2008-05-09 18:40:13 +0000 |
commit | a91fe82ffdb32f0db050c8c071379281295e5ca8 (patch) | |
tree | 719c86b1b448ee9d7df40cd24ce1c4f9fe2b366b | |
parent | 485022ac7cd72b40cb4c99f2e27389d016a31371 (diff) | |
download | qpid-python-a91fe82ffdb32f0db050c8c071379281295e5ca8.tar.gz |
QPID-1045: always notify incoming message queues of session closure and provide API for notifying listeners of closure; also preserve connection close code and report in errors
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@654907 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/python/qpid/connection.py | 31 | ||||
-rw-r--r-- | qpid/python/qpid/delegates.py | 17 | ||||
-rw-r--r-- | qpid/python/qpid/exceptions.py | 1 | ||||
-rw-r--r-- | qpid/python/qpid/framer.py | 3 | ||||
-rw-r--r-- | qpid/python/qpid/queue.py | 21 | ||||
-rw-r--r-- | qpid/python/qpid/session.py | 44 | ||||
-rw-r--r-- | qpid/python/tests/connection.py | 68 |
7 files changed, 138 insertions, 47 deletions
diff --git a/qpid/python/qpid/connection.py b/qpid/python/qpid/connection.py index 4ed430249b..8d0e115458 100644 --- a/qpid/python/qpid/connection.py +++ b/qpid/python/qpid/connection.py @@ -19,8 +19,7 @@ import datatypes, session from threading import Thread, Condition, RLock -from util import wait -from framer import Closed +from util import wait, notify from assembler import Assembler, Segment from codec010 import StringCodec from session import Session @@ -39,11 +38,11 @@ class SessionBusy(Exception): pass class ConnectionFailed(Exception): pass -def client(*args): - return delegates.Client(*args) +def client(*args, **kwargs): + return delegates.Client(*args, **kwargs) -def server(*args): - return delegates.Server(*args) +def server(*args, **kwargs): + return delegates.Server(*args, **kwargs) class Connection(Assembler): @@ -61,13 +60,14 @@ class Connection(Assembler): self.condition = Condition() self.opened = False self.failed = False + self.close_code = (None, "connection aborted") self.thread = Thread(target=self.run) self.thread.setDaemon(True) self.channel_max = 65535 - self.delegate = delegate(self, args) + self.delegate = delegate(self, **args) def attach(self, name, ch, delegate, force=False): self.lock.acquire() @@ -101,6 +101,8 @@ class Connection(Assembler): ssn = self.sessions.pop(name, None) if ssn is not None: ssn.channel = None + ssn.closed() + notify(ssn.condition) return ssn finally: self.lock.release() @@ -127,13 +129,23 @@ class Connection(Assembler): finally: self.lock.release() + def detach_all(self): + self.lock.acquire() + try: + for ssn in self.attached.values(): + if self.close_code[0] != 200: + ssn.exceptions.append(self.close_code) + self.detach(ssn.name, ssn.channel) + finally: + self.lock.release() + def start(self, timeout=None): self.delegate.start() self.thread.start() if not wait(self.condition, lambda: self.opened or self.failed, timeout): raise Timeout() - if (self.failed): - raise ConnectionFailed() + if self.failed: + raise ConnectionFailed(*self.close_code) def run(self): # XXX: we don't really have a good way to exit this loop without @@ -142,6 +154,7 @@ class Connection(Assembler): try: seg = self.read_segment() except Closed: + self.detach_all() break self.delegate.received(seg) diff --git a/qpid/python/qpid/delegates.py b/qpid/python/qpid/delegates.py index cdff132219..6e6b55e36a 100644 --- a/qpid/python/qpid/delegates.py +++ b/qpid/python/qpid/delegates.py @@ -50,6 +50,7 @@ class Delegate: ssn.received(seg) def connection_close(self, ch, close): + self.connection.close_code = (close.reply_code, close.reply_text) ch.connection_close_ok() self.connection.sock.close() if not self.connection.opened: @@ -73,13 +74,11 @@ class Delegate: notify(ch.session.condition) def session_detach(self, ch, d): - self.connection.detach(d.name, ch) + ssn = self.connection.detach(d.name, ch) ch.session_detached(d.name) def session_detached(self, ch, d): - ssn = self.connection.detach(d.name, ch) - if ssn is not None: - notify(ch.session.condition) + self.connection.detach(d.name, ch) def session_command_point(self, ch, cp): ssn = ch.session @@ -127,11 +126,11 @@ class Client(Delegate): "version": "development", "platform": os.name} - def __init__(self, connection, args={}): - Delegate.__init__(self, connection) - self.username = args.get('username', 'guest') - self.password = args.get('password', 'guest') - self.mechanism = args.get('mechanism', 'PLAIN') + def __init__(self, connection, username="guest", password="guest", mechanism="PLAIN"): + Delegate.__init__(self, connection) + self.username = username + self.password = password + self.mechanism = mechanism def start(self): self.connection.write_header(self.spec.major, self.spec.minor) diff --git a/qpid/python/qpid/exceptions.py b/qpid/python/qpid/exceptions.py index 2136793d3b..7eaaf81ed4 100644 --- a/qpid/python/qpid/exceptions.py +++ b/qpid/python/qpid/exceptions.py @@ -17,4 +17,5 @@ # under the License. # +class Closed(Exception): pass class Timeout(Exception): pass diff --git a/qpid/python/qpid/framer.py b/qpid/python/qpid/framer.py index 78a29235cb..27ea3287f0 100644 --- a/qpid/python/qpid/framer.py +++ b/qpid/python/qpid/framer.py @@ -18,6 +18,7 @@ # import struct, socket +from exceptions import Closed from packer import Packer from threading import Lock from logging import getLogger @@ -66,8 +67,6 @@ class Frame: self.channel, self.payload) -class Closed(Exception): pass - class FramingError(Exception): pass class Framer(Packer): diff --git a/qpid/python/qpid/queue.py b/qpid/python/qpid/queue.py index ea8f00d091..a8a5c0d9ad 100644 --- a/qpid/python/qpid/queue.py +++ b/qpid/python/qpid/queue.py @@ -25,8 +25,7 @@ content of a queue can be notified if the queue is no longer in use. from Queue import Queue as BaseQueue, Empty, Full from threading import Thread - -class Closed(Exception): pass +from exceptions import Closed class Queue(BaseQueue): @@ -37,6 +36,7 @@ class Queue(BaseQueue): BaseQueue.__init__(self, *args, **kwargs) self.error = None self.listener = None + self.exc_listener = None self.thread = None def close(self, error = None): @@ -53,15 +53,20 @@ class Queue(BaseQueue): else: return result - def listen(self, listener): + def listen(self, listener, exc_listener = None): + if listener is None and exc_listener is not None: + raise ValueError("cannot set exception listener without setting listener") + self.listener = listener - if listener == None: - if self.thread != None: + self.exc_listener = exc_listener + + if listener is None: + if self.thread is not None: self.put(Queue.STOP) self.thread.join() self.thread = None else: - if self.thread == None: + if self.thread is None: self.thread = Thread(target = self.run) self.thread.setDaemon(True) self.thread.start() @@ -72,5 +77,7 @@ class Queue(BaseQueue): o = self.get() if o == Queue.STOP: break self.listener(o) - except Closed: + except Closed, e: + if self.exc_listener is not None: + self.exc_listener(e) break diff --git a/qpid/python/qpid/session.py b/qpid/python/qpid/session.py index f8ac98b96e..a1103e0428 100644 --- a/qpid/python/qpid/session.py +++ b/qpid/python/qpid/session.py @@ -52,7 +52,8 @@ class Session(Invoker): self.timeout = timeout self.channel = None self.invoke_lock = Lock() - self.closed = False + self._closing = False + self._closed = False self.condition = Condition() @@ -82,7 +83,9 @@ class Session(Invoker): def error(self): exc = self.exceptions[:] - if len(exc) == 1: + if len(exc) == 0: + return None + elif len(exc) == 1: return exc[0] else: return tuple(exc) @@ -102,13 +105,31 @@ class Session(Invoker): def close(self, timeout=None): self.invoke_lock.acquire() try: - self.closed = True + self._closing = True self.channel.session_detach(self.name) finally: self.invoke_lock.release() if not wait(self.condition, lambda: self.channel is None, timeout): raise Timeout() + def closed(self): + self.lock.acquire() + try: + if self._closed: return + self._closed = True + + error = self.error() + for id in self.results: + f = self.results[id] + f.error(error) + self.results.clear() + + for q in self._incoming.values(): + q.close(error) + notify(self.condition) + finally: + self.lock.release() + def resolve_method(self, name): cmd = self.spec.instructions.get(name) if cmd is not None and cmd.track == self.spec["track.command"].value: @@ -136,7 +157,7 @@ class Session(Invoker): self.invoke_lock.release() def do_invoke(self, type, args, kwargs): - if self.closed: + if self._closing: raise SessionClosed() if self.channel == None: @@ -311,20 +332,7 @@ class Delegate: future.set(er.value) def execution_exception(self, ex): - self.session.lock.acquire() - try: - self.session.exceptions.append(ex) - error = self.session.error() - for id in self.session.results: - f = self.session.results[id] - f.error(error) - self.session.results.clear() - - for q in self.session._incoming.values(): - q.close(error) - notify(self.session.condition) - finally: - self.session.lock.release() + self.session.exceptions.append(ex) class Client(Delegate): diff --git a/qpid/python/tests/connection.py b/qpid/python/tests/connection.py index 6925480ed3..88620bc1c6 100644 --- a/qpid/python/tests/connection.py +++ b/qpid/python/tests/connection.py @@ -51,8 +51,16 @@ class TestSession(Delegate): def queue_query(self, qq): return qq._type.result.type.new((qq.queue,), {}) - def message_transfer(self, cmd, header, body): - self.queue.put((cmd, header, body)) + def message_transfer(self, cmd, headers, body): + if cmd.destination == "echo": + m = Message(body) + m.headers = headers + self.session.message_transfer(cmd.destination, cmd.accept_mode, + cmd.acquire_mode, m) + elif cmd.destination == "abort": + self.session.channel.connection.sock.close() + else: + self.queue.put((cmd, headers, body)) class ConnectionTest(TestCase): @@ -134,3 +142,59 @@ class ConnectionTest(TestCase): qq = ssn.queue_query("asdf") assert qq.queue == "asdf" c.close(5) + + def testCloseGet(self): + c = Connection(connect("0.0.0.0", PORT), self.spec) + c.start(10) + ssn = c.session("test", timeout=10) + echos = ssn.incoming("echo") + + for i in range(10): + ssn.message_transfer("echo", message=Message("test%d" % i)) + + ssn.auto_sync=False + ssn.message_transfer("abort") + + for i in range(10): + m = echos.get(timeout=10) + assert m.body == "test%d" % i + + try: + m = echos.get(timeout=10) + assert False + except Closed, e: + pass + + def testCloseListen(self): + c = Connection(connect("0.0.0.0", PORT), self.spec) + c.start(10) + ssn = c.session("test", timeout=10) + echos = ssn.incoming("echo") + + messages = [] + exceptions = [] + condition = Condition() + def listener(m): messages.append(m) + def exc_listener(e): + exceptions.append(e) + condition.acquire() + condition.notify() + condition.release() + + echos.listen(listener, exc_listener) + + for i in range(10): + ssn.message_transfer("echo", message=Message("test%d" % i)) + + ssn.auto_sync=False + ssn.message_transfer("abort") + + condition.acquire() + condition.wait(10) + condition.release() + + for i in range(10): + m = messages.pop(0) + assert m.body == "test%d" % i + + assert len(exceptions) == 1 |