summaryrefslogtreecommitdiff
path: root/python/qpid/session.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/qpid/session.py')
-rw-r--r--python/qpid/session.py44
1 files changed, 35 insertions, 9 deletions
diff --git a/python/qpid/session.py b/python/qpid/session.py
index c628762dac..b83bd1637f 100644
--- a/python/qpid/session.py
+++ b/python/qpid/session.py
@@ -17,13 +17,14 @@
# under the License.
#
-from threading import Event, RLock
+from threading import Condition, RLock
from invoker import Invoker
from datatypes import RangedSet, Struct, Future
from codec010 import StringCodec
from assembler import Segment
from queue import Queue
from datatypes import Message
+from util import wait
from logging import getLogger
class SessionDetached(Exception): pass
@@ -34,6 +35,8 @@ def client(*args):
def server(*args):
return Server(*args)
+class SessionException(Exception): pass
+
class Session(Invoker):
def __init__(self, name, spec, sync=True, timeout=10, delegate=client):
@@ -42,17 +45,22 @@ class Session(Invoker):
self.sync = sync
self.timeout = timeout
self.channel = None
- self.opened = Event()
- self.closed = Event()
+
+ self.condition = Condition()
+
+ self.send_id = True
self.receiver = Receiver(self)
self.sender = Sender(self)
- self.delegate = delegate(self)
- self.send_id = True
- self.results = {}
+
self.lock = RLock()
self._incoming = {}
+ self.results = {}
+ self.exceptions = []
+
self.assembly = None
+ self.delegate = delegate(self)
+
def incoming(self, destination):
self.lock.acquire()
try:
@@ -66,7 +74,7 @@ class Session(Invoker):
def close(self, timeout=None):
self.channel.session_detach(self.name)
- self.closed.wait(timeout=timeout)
+ wait(self.condition, lambda: self.channel is None, timeout)
def resolve_method(self, name):
cmd = self.spec.instructions.get(name)
@@ -105,7 +113,7 @@ class Session(Invoker):
type.segment_type, type.track, self.channel.id, sc.encoded)
if type.result:
- result = Future()
+ result = Future(exception=SessionException)
self.results[self.sender.next_id] = result
self.send(seg)
@@ -234,9 +242,27 @@ class Delegate:
self.session = session
def execution_result(self, er):
- future = self.session.results[er.command_id]
+ future = self.session.results.pop(er.command_id)
future.set(er.value)
+ def execution_exception(self, ex):
+ self.session.lock.acquire()
+ try:
+ self.session.exceptions.append(ex)
+ excs = self.session.exceptions[:]
+ if len(excs) == 1:
+ error = excs[0]
+ else:
+ error = tuple(excs)
+ for id in self.session.results:
+ f = self.session.results.pop(id)
+ f.error(error)
+
+ for q in self.session._incoming.values():
+ q.close(error)
+ finally:
+ self.session.lock.release()
+
msg = getLogger("qpid.ssn.msg")
class Client(Delegate):