summaryrefslogtreecommitdiff
path: root/python/qpid/session.py
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2008-03-07 13:55:00 +0000
committerRafael H. Schloming <rhs@apache.org>2008-03-07 13:55:00 +0000
commit22912b1f7e82542b66f53be02ea1b8be3402e728 (patch)
tree56bf0bf58c7726fe5f4dd7e8d0adad0ac69f2e53 /python/qpid/session.py
parent6c5dae81f69c2b5fa8fbc3dee0ac90a6bdbb76fa (diff)
downloadqpid-python-22912b1f7e82542b66f53be02ea1b8be3402e728.tar.gz
added timeouts to hello-010-world; switched to conditions rather than events for handling connection/session state; handle session exceptions
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@634678 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/session.py')
-rw-r--r--python/qpid/session.py44
1 files changed, 35 insertions, 9 deletions
diff --git a/python/qpid/session.py b/python/qpid/session.py
index c628762dac..b83bd1637f 100644
--- a/python/qpid/session.py
+++ b/python/qpid/session.py
@@ -17,13 +17,14 @@
# under the License.
#
-from threading import Event, RLock
+from threading import Condition, RLock
from invoker import Invoker
from datatypes import RangedSet, Struct, Future
from codec010 import StringCodec
from assembler import Segment
from queue import Queue
from datatypes import Message
+from util import wait
from logging import getLogger
class SessionDetached(Exception): pass
@@ -34,6 +35,8 @@ def client(*args):
def server(*args):
return Server(*args)
+class SessionException(Exception): pass
+
class Session(Invoker):
def __init__(self, name, spec, sync=True, timeout=10, delegate=client):
@@ -42,17 +45,22 @@ class Session(Invoker):
self.sync = sync
self.timeout = timeout
self.channel = None
- self.opened = Event()
- self.closed = Event()
+
+ self.condition = Condition()
+
+ self.send_id = True
self.receiver = Receiver(self)
self.sender = Sender(self)
- self.delegate = delegate(self)
- self.send_id = True
- self.results = {}
+
self.lock = RLock()
self._incoming = {}
+ self.results = {}
+ self.exceptions = []
+
self.assembly = None
+ self.delegate = delegate(self)
+
def incoming(self, destination):
self.lock.acquire()
try:
@@ -66,7 +74,7 @@ class Session(Invoker):
def close(self, timeout=None):
self.channel.session_detach(self.name)
- self.closed.wait(timeout=timeout)
+ wait(self.condition, lambda: self.channel is None, timeout)
def resolve_method(self, name):
cmd = self.spec.instructions.get(name)
@@ -105,7 +113,7 @@ class Session(Invoker):
type.segment_type, type.track, self.channel.id, sc.encoded)
if type.result:
- result = Future()
+ result = Future(exception=SessionException)
self.results[self.sender.next_id] = result
self.send(seg)
@@ -234,9 +242,27 @@ class Delegate:
self.session = session
def execution_result(self, er):
- future = self.session.results[er.command_id]
+ future = self.session.results.pop(er.command_id)
future.set(er.value)
+ def execution_exception(self, ex):
+ self.session.lock.acquire()
+ try:
+ self.session.exceptions.append(ex)
+ excs = self.session.exceptions[:]
+ if len(excs) == 1:
+ error = excs[0]
+ else:
+ error = tuple(excs)
+ for id in self.session.results:
+ f = self.session.results.pop(id)
+ f.error(error)
+
+ for q in self.session._incoming.values():
+ q.close(error)
+ finally:
+ self.session.lock.release()
+
msg = getLogger("qpid.ssn.msg")
class Client(Delegate):