diff options
Diffstat (limited to 'python/qpid/session.py')
-rw-r--r-- | python/qpid/session.py | 44 |
1 files changed, 35 insertions, 9 deletions
diff --git a/python/qpid/session.py b/python/qpid/session.py index c628762dac..b83bd1637f 100644 --- a/python/qpid/session.py +++ b/python/qpid/session.py @@ -17,13 +17,14 @@ # under the License. # -from threading import Event, RLock +from threading import Condition, RLock from invoker import Invoker from datatypes import RangedSet, Struct, Future from codec010 import StringCodec from assembler import Segment from queue import Queue from datatypes import Message +from util import wait from logging import getLogger class SessionDetached(Exception): pass @@ -34,6 +35,8 @@ def client(*args): def server(*args): return Server(*args) +class SessionException(Exception): pass + class Session(Invoker): def __init__(self, name, spec, sync=True, timeout=10, delegate=client): @@ -42,17 +45,22 @@ class Session(Invoker): self.sync = sync self.timeout = timeout self.channel = None - self.opened = Event() - self.closed = Event() + + self.condition = Condition() + + self.send_id = True self.receiver = Receiver(self) self.sender = Sender(self) - self.delegate = delegate(self) - self.send_id = True - self.results = {} + self.lock = RLock() self._incoming = {} + self.results = {} + self.exceptions = [] + self.assembly = None + self.delegate = delegate(self) + def incoming(self, destination): self.lock.acquire() try: @@ -66,7 +74,7 @@ class Session(Invoker): def close(self, timeout=None): self.channel.session_detach(self.name) - self.closed.wait(timeout=timeout) + wait(self.condition, lambda: self.channel is None, timeout) def resolve_method(self, name): cmd = self.spec.instructions.get(name) @@ -105,7 +113,7 @@ class Session(Invoker): type.segment_type, type.track, self.channel.id, sc.encoded) if type.result: - result = Future() + result = Future(exception=SessionException) self.results[self.sender.next_id] = result self.send(seg) @@ -234,9 +242,27 @@ class Delegate: self.session = session def execution_result(self, er): - future = self.session.results[er.command_id] + future = self.session.results.pop(er.command_id) future.set(er.value) + def execution_exception(self, ex): + self.session.lock.acquire() + try: + self.session.exceptions.append(ex) + excs = self.session.exceptions[:] + if len(excs) == 1: + error = excs[0] + else: + error = tuple(excs) + for id in self.session.results: + f = self.session.results.pop(id) + f.error(error) + + for q in self.session._incoming.values(): + q.close(error) + finally: + self.session.lock.release() + msg = getLogger("qpid.ssn.msg") class Client(Delegate): |