diff options
-rwxr-xr-x | python/hello-010-world | 8 | ||||
-rw-r--r-- | python/qpid/connection010.py | 30 | ||||
-rw-r--r-- | python/qpid/datatypes.py | 9 | ||||
-rw-r--r-- | python/qpid/delegates.py | 14 | ||||
-rw-r--r-- | python/qpid/queue.py | 6 | ||||
-rw-r--r-- | python/qpid/session.py | 44 | ||||
-rw-r--r-- | python/qpid/util.py | 27 |
7 files changed, 104 insertions, 34 deletions
diff --git a/python/hello-010-world b/python/hello-010-world index ff01cf9ed7..8c98170873 100755 --- a/python/hello-010-world +++ b/python/hello-010-world @@ -29,13 +29,13 @@ ssn.message_transfer("is", None, None, Message(props, "more testing...")) ssn.message_transfer("a") ssn.message_transfer("test") -m1 = ssn.incoming("this").get() +m1 = ssn.incoming("this").get(timeout=10) print m1 -m2 = ssn.incoming("is").get() +m2 = ssn.incoming("is").get(timeout=10) print m2 -m3 = ssn.incoming("a").get() +m3 = ssn.incoming("a").get(timeout=10) print m3 -m4 = ssn.incoming("test").get() +m4 = ssn.incoming("test").get(timeout=10) print m4 ssn.message_accept(RangedSet(m1.id, m2.id, m3.id, m4.id)) diff --git a/python/qpid/connection010.py b/python/qpid/connection010.py index b25efd37a8..b17ceea534 100644 --- a/python/qpid/connection010.py +++ b/python/qpid/connection010.py @@ -18,7 +18,8 @@ # import datatypes, session -from threading import Thread, Event, RLock +from threading import Thread, Condition, RLock +from util import wait from framer import Closed from assembler import Assembler, Segment from codec010 import StringCodec @@ -47,16 +48,21 @@ class Connection(Assembler): Assembler.__init__(self, sock) self.spec = spec self.track = self.spec["track"] - self.delegate = delegate(self) + + self.lock = RLock() self.attached = {} self.sessions = {} - self.lock = RLock() + + self.condition = Condition() + self.opened = False + self.thread = Thread(target=self.run) self.thread.setDaemon(True) - self.opened = Event() - self.closed = Event() + self.channel_max = 65535 + self.delegate = delegate(self) + def attach(self, name, ch, delegate, force=False): self.lock.acquire() try: @@ -104,12 +110,13 @@ class Connection(Assembler): def session(self, name, timeout=None, delegate=session.client): self.lock.acquire() try: - ssn = self.attach(name, Channel(self, self.__channel()), delegate) + ch = Channel(self, self.__channel()) + ssn = self.attach(name, ch, delegate) ssn.channel.session_attach(name) - ssn.opened.wait(timeout) - if ssn.opened.isSet(): + if wait(ssn.condition, lambda: ssn.channel is not None, timeout): return ssn else: + self.detach(name, ch) raise Timeout() finally: self.lock.release() @@ -117,8 +124,7 @@ class Connection(Assembler): def start(self, timeout=None): self.delegate.start() self.thread.start() - self.opened.wait(timeout=timeout) - if not self.opened.isSet(): + if not wait(self.condition, lambda: self.opened, timeout): raise Timeout() def run(self): @@ -132,9 +138,9 @@ class Connection(Assembler): self.delegate.received(seg) def close(self, timeout=None): + if not self.opened: return Channel(self, 0).connection_close() - self.closed.wait(timeout=timeout) - if not self.closed.isSet(): + if not wait(self.condition, lambda: not self.opened, timeout): raise Timeout() self.thread.join(timeout=timeout) diff --git a/python/qpid/datatypes.py b/python/qpid/datatypes.py index 7158e0d75e..bce68aebcd 100644 --- a/python/qpid/datatypes.py +++ b/python/qpid/datatypes.py @@ -112,16 +112,23 @@ class RangedSet: return "RangedSet(%s)" % str(self.ranges) class Future: - def __init__(self, initial=None): + def __init__(self, initial=None, exception=Exception): self.value = initial + self._error = None self._set = threading.Event() + def error(self, error): + self._error = error + self._set.set() + def set(self, value): self.value = value self._set.set() def get(self, timeout=None): self._set.wait(timeout) + if self._error != None: + raise exception(self._error) return self.value def is_set(self): diff --git a/python/qpid/delegates.py b/python/qpid/delegates.py index 4fdcc37384..83413b91ea 100644 --- a/python/qpid/delegates.py +++ b/python/qpid/delegates.py @@ -19,6 +19,7 @@ import connection010 import session +from util import notify class Delegate: @@ -49,7 +50,8 @@ class Delegate: self.connection.sock.close() def connection_close_ok(self, ch, close_ok): - self.connection.closed.set() + self.connection.opened = False + notify(self.connection.condition) def session_attach(self, ch, a): try: @@ -61,7 +63,7 @@ class Delegate: ch.session_detached(a.name) def session_attached(self, ch, a): - ch.session.opened.set() + notify(ch.session.condition) def session_detach(self, ch, d): self.connection.detach(d.name, ch) @@ -70,7 +72,7 @@ class Delegate: def session_detached(self, ch, d): ssn = self.connection.detach(d.name, ch) if ssn is not None: - ssn.closed.set() + notify(ch.session.condition) def session_command_point(self, ch, cp): ssn = ch.session @@ -91,8 +93,9 @@ class Server(Delegate): pass def connection_open(self, ch, open): - self.connection.opened.set() + self.connection.opened = True ch.connection_open_ok() + notify(self.connection.condition) class Client(Delegate): @@ -108,4 +111,5 @@ class Client(Delegate): ch.connection_open() def connection_open_ok(self, ch, open_ok): - self.connection.opened.set() + self.connection.opened = True + notify(self.connection.condition) diff --git a/python/qpid/queue.py b/python/qpid/queue.py index 00946a9156..ea8f00d091 100644 --- a/python/qpid/queue.py +++ b/python/qpid/queue.py @@ -35,10 +35,12 @@ class Queue(BaseQueue): def __init__(self, *args, **kwargs): BaseQueue.__init__(self, *args, **kwargs) + self.error = None self.listener = None self.thread = None - def close(self): + def close(self, error = None): + self.error = error self.put(Queue.END) def get(self, block = True, timeout = None): @@ -47,7 +49,7 @@ class Queue(BaseQueue): # this guarantees that any other waiting threads or any future # calls to get will also result in a Closed exception self.put(Queue.END) - raise Closed() + raise Closed(self.error) else: return result diff --git a/python/qpid/session.py b/python/qpid/session.py index c628762dac..b83bd1637f 100644 --- a/python/qpid/session.py +++ b/python/qpid/session.py @@ -17,13 +17,14 @@ # under the License. # -from threading import Event, RLock +from threading import Condition, RLock from invoker import Invoker from datatypes import RangedSet, Struct, Future from codec010 import StringCodec from assembler import Segment from queue import Queue from datatypes import Message +from util import wait from logging import getLogger class SessionDetached(Exception): pass @@ -34,6 +35,8 @@ def client(*args): def server(*args): return Server(*args) +class SessionException(Exception): pass + class Session(Invoker): def __init__(self, name, spec, sync=True, timeout=10, delegate=client): @@ -42,17 +45,22 @@ class Session(Invoker): self.sync = sync self.timeout = timeout self.channel = None - self.opened = Event() - self.closed = Event() + + self.condition = Condition() + + self.send_id = True self.receiver = Receiver(self) self.sender = Sender(self) - self.delegate = delegate(self) - self.send_id = True - self.results = {} + self.lock = RLock() self._incoming = {} + self.results = {} + self.exceptions = [] + self.assembly = None + self.delegate = delegate(self) + def incoming(self, destination): self.lock.acquire() try: @@ -66,7 +74,7 @@ class Session(Invoker): def close(self, timeout=None): self.channel.session_detach(self.name) - self.closed.wait(timeout=timeout) + wait(self.condition, lambda: self.channel is None, timeout) def resolve_method(self, name): cmd = self.spec.instructions.get(name) @@ -105,7 +113,7 @@ class Session(Invoker): type.segment_type, type.track, self.channel.id, sc.encoded) if type.result: - result = Future() + result = Future(exception=SessionException) self.results[self.sender.next_id] = result self.send(seg) @@ -234,9 +242,27 @@ class Delegate: self.session = session def execution_result(self, er): - future = self.session.results[er.command_id] + future = self.session.results.pop(er.command_id) future.set(er.value) + def execution_exception(self, ex): + self.session.lock.acquire() + try: + self.session.exceptions.append(ex) + excs = self.session.exceptions[:] + if len(excs) == 1: + error = excs[0] + else: + error = tuple(excs) + for id in self.session.results: + f = self.session.results.pop(id) + f.error(error) + + for q in self.session._incoming.values(): + q.close(error) + finally: + self.session.lock.release() + msg = getLogger("qpid.ssn.msg") class Client(Delegate): diff --git a/python/qpid/util.py b/python/qpid/util.py index e41dfc75fb..d03a9bd7e9 100644 --- a/python/qpid/util.py +++ b/python/qpid/util.py @@ -17,7 +17,7 @@ # under the License. # -import os, socket +import os, socket, time def connect(host, port): sock = socket.socket() @@ -40,3 +40,28 @@ def listen(host, port, predicate = lambda: True, bound = lambda: None): def mtime(filename): return os.stat(filename).st_mtime + +def wait(condition, predicate, timeout=None): + condition.acquire() + try: + passed = 0 + start = time.time() + while not predicate(): + if timeout is None: + condition.wait() + elif passed < timeout: + condition.wait(timeout - passed) + else: + return False + passed = time.time() - start + return True + finally: + condition.release() + +def notify(condition, action=lambda: None): + condition.acquire() + try: + action() + condition.notifyAll() + finally: + condition.release() |