diff options
author | Rafael H. Schloming <rhs@apache.org> | 2008-03-07 16:57:43 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2008-03-07 16:57:43 +0000 |
commit | 205ae0e54ba0c5fdeb5d2e884997c80cb52f1799 (patch) | |
tree | cc85d43a8e09cb15111290f9302c4c6188c0784c /python/qpid/session.py | |
parent | e385ba8c6612ac396a4d9ecbd5c8ffa18977e25a (diff) | |
download | qpid-python-205ae0e54ba0c5fdeb5d2e884997c80cb52f1799.tar.gz |
added session.sync(); session.auto_sync; made transfers not auto-complete; fixed bug in RangedSet
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@634744 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/session.py')
-rw-r--r-- | python/qpid/session.py | 54 |
1 files changed, 41 insertions, 13 deletions
diff --git a/python/qpid/session.py b/python/qpid/session.py index b83bd1637f..7a84fa601d 100644 --- a/python/qpid/session.py +++ b/python/qpid/session.py @@ -17,14 +17,15 @@ # under the License. # -from threading import Condition, RLock +from threading import Condition, RLock, currentThread from invoker import Invoker from datatypes import RangedSet, Struct, Future from codec010 import StringCodec from assembler import Segment from queue import Queue from datatypes import Message -from util import wait +from util import wait, notify +from exceptions import * from logging import getLogger class SessionDetached(Exception): pass @@ -37,12 +38,14 @@ def server(*args): class SessionException(Exception): pass +INCOMPLETE = object() + class Session(Invoker): - def __init__(self, name, spec, sync=True, timeout=10, delegate=client): + def __init__(self, name, spec, auto_sync=True, timeout=10, delegate=client): self.name = name self.spec = spec - self.sync = sync + self.auto_sync = auto_sync self.timeout = timeout self.channel = None @@ -72,9 +75,29 @@ class Session(Invoker): finally: self.lock.release() + def error(self): + exc = self.exceptions[:] + if len(exc) == 1: + return exc[0] + else: + return tuple(exc) + + def sync(self, timeout=None): + if currentThread() == self.channel.connection.thread: + raise SessionException("deadlock detected") + self.channel.session_flush(completed=True) + last = self.sender.next_id - 1 + if not wait(self.condition, lambda: + last in self.sender._completed or self.exceptions, + timeout): + raise Timeout() + if self.exceptions: + raise SessionException(self.error()) + def close(self, timeout=None): self.channel.session_detach(self.name) - wait(self.condition, lambda: self.channel is None, timeout) + if not wait(self.condition, lambda: self.channel is None, timeout): + raise Timeout() def resolve_method(self, name): cmd = self.spec.instructions.get(name) @@ -132,10 +155,12 @@ class Session(Invoker): self.send(seg) if type.result: - if self.sync: + if self.auto_sync: return result.get(self.timeout) else: return result + elif self.auto_sync: + self.sync(self.timeout) def received(self, seg): self.receiver.received(seg) @@ -148,6 +173,7 @@ class Session(Invoker): self.assembly = None def dispatch(self, assembly): + segments = assembly[:] cmd = assembly.pop(0).decode(self.spec) args = [] @@ -168,8 +194,9 @@ class Session(Invoker): if cmd.type.result: self.execution_result(cmd.id, result) - for seg in assembly: - self.receiver.completed(seg) + if result is not INCOMPLETE: + for seg in segments: + self.receiver.completed(seg) def send(self, seg): self.sender.send(seg) @@ -212,6 +239,7 @@ class Sender: self.next_id = 0 self.next_offset = 0 self.segments = [] + self._completed = RangedSet() def send(self, seg): seg.id = self.next_id @@ -235,6 +263,8 @@ class Sender: del self.segments[idx] else: idx += 1 + for range in commands.ranges: + self._completed.add(range.lower, range.upper) class Delegate: @@ -249,17 +279,14 @@ class Delegate: self.session.lock.acquire() try: self.session.exceptions.append(ex) - excs = self.session.exceptions[:] - if len(excs) == 1: - error = excs[0] - else: - error = tuple(excs) + error = self.session.error() for id in self.session.results: f = self.session.results.pop(id) f.error(error) for q in self.session._incoming.values(): q.close(error) + notify(self.session.condition) finally: self.session.lock.release() @@ -274,3 +301,4 @@ class Client(Delegate): messages = self.session.incoming(cmd.destination) messages.put(m) msg.debug("RECV: %s", m) + return INCOMPLETE |