summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xpython/hello-010-world8
-rw-r--r--python/qpid/connection010.py30
-rw-r--r--python/qpid/datatypes.py9
-rw-r--r--python/qpid/delegates.py14
-rw-r--r--python/qpid/queue.py6
-rw-r--r--python/qpid/session.py44
-rw-r--r--python/qpid/util.py27
7 files changed, 104 insertions, 34 deletions
diff --git a/python/hello-010-world b/python/hello-010-world
index ff01cf9ed7..8c98170873 100755
--- a/python/hello-010-world
+++ b/python/hello-010-world
@@ -29,13 +29,13 @@ ssn.message_transfer("is", None, None, Message(props, "more testing..."))
ssn.message_transfer("a")
ssn.message_transfer("test")
-m1 = ssn.incoming("this").get()
+m1 = ssn.incoming("this").get(timeout=10)
print m1
-m2 = ssn.incoming("is").get()
+m2 = ssn.incoming("is").get(timeout=10)
print m2
-m3 = ssn.incoming("a").get()
+m3 = ssn.incoming("a").get(timeout=10)
print m3
-m4 = ssn.incoming("test").get()
+m4 = ssn.incoming("test").get(timeout=10)
print m4
ssn.message_accept(RangedSet(m1.id, m2.id, m3.id, m4.id))
diff --git a/python/qpid/connection010.py b/python/qpid/connection010.py
index b25efd37a8..b17ceea534 100644
--- a/python/qpid/connection010.py
+++ b/python/qpid/connection010.py
@@ -18,7 +18,8 @@
#
import datatypes, session
-from threading import Thread, Event, RLock
+from threading import Thread, Condition, RLock
+from util import wait
from framer import Closed
from assembler import Assembler, Segment
from codec010 import StringCodec
@@ -47,16 +48,21 @@ class Connection(Assembler):
Assembler.__init__(self, sock)
self.spec = spec
self.track = self.spec["track"]
- self.delegate = delegate(self)
+
+ self.lock = RLock()
self.attached = {}
self.sessions = {}
- self.lock = RLock()
+
+ self.condition = Condition()
+ self.opened = False
+
self.thread = Thread(target=self.run)
self.thread.setDaemon(True)
- self.opened = Event()
- self.closed = Event()
+
self.channel_max = 65535
+ self.delegate = delegate(self)
+
def attach(self, name, ch, delegate, force=False):
self.lock.acquire()
try:
@@ -104,12 +110,13 @@ class Connection(Assembler):
def session(self, name, timeout=None, delegate=session.client):
self.lock.acquire()
try:
- ssn = self.attach(name, Channel(self, self.__channel()), delegate)
+ ch = Channel(self, self.__channel())
+ ssn = self.attach(name, ch, delegate)
ssn.channel.session_attach(name)
- ssn.opened.wait(timeout)
- if ssn.opened.isSet():
+ if wait(ssn.condition, lambda: ssn.channel is not None, timeout):
return ssn
else:
+ self.detach(name, ch)
raise Timeout()
finally:
self.lock.release()
@@ -117,8 +124,7 @@ class Connection(Assembler):
def start(self, timeout=None):
self.delegate.start()
self.thread.start()
- self.opened.wait(timeout=timeout)
- if not self.opened.isSet():
+ if not wait(self.condition, lambda: self.opened, timeout):
raise Timeout()
def run(self):
@@ -132,9 +138,9 @@ class Connection(Assembler):
self.delegate.received(seg)
def close(self, timeout=None):
+ if not self.opened: return
Channel(self, 0).connection_close()
- self.closed.wait(timeout=timeout)
- if not self.closed.isSet():
+ if not wait(self.condition, lambda: not self.opened, timeout):
raise Timeout()
self.thread.join(timeout=timeout)
diff --git a/python/qpid/datatypes.py b/python/qpid/datatypes.py
index 7158e0d75e..bce68aebcd 100644
--- a/python/qpid/datatypes.py
+++ b/python/qpid/datatypes.py
@@ -112,16 +112,23 @@ class RangedSet:
return "RangedSet(%s)" % str(self.ranges)
class Future:
- def __init__(self, initial=None):
+ def __init__(self, initial=None, exception=Exception):
self.value = initial
+ self._error = None
self._set = threading.Event()
+ def error(self, error):
+ self._error = error
+ self._set.set()
+
def set(self, value):
self.value = value
self._set.set()
def get(self, timeout=None):
self._set.wait(timeout)
+ if self._error != None:
+ raise exception(self._error)
return self.value
def is_set(self):
diff --git a/python/qpid/delegates.py b/python/qpid/delegates.py
index 4fdcc37384..83413b91ea 100644
--- a/python/qpid/delegates.py
+++ b/python/qpid/delegates.py
@@ -19,6 +19,7 @@
import connection010
import session
+from util import notify
class Delegate:
@@ -49,7 +50,8 @@ class Delegate:
self.connection.sock.close()
def connection_close_ok(self, ch, close_ok):
- self.connection.closed.set()
+ self.connection.opened = False
+ notify(self.connection.condition)
def session_attach(self, ch, a):
try:
@@ -61,7 +63,7 @@ class Delegate:
ch.session_detached(a.name)
def session_attached(self, ch, a):
- ch.session.opened.set()
+ notify(ch.session.condition)
def session_detach(self, ch, d):
self.connection.detach(d.name, ch)
@@ -70,7 +72,7 @@ class Delegate:
def session_detached(self, ch, d):
ssn = self.connection.detach(d.name, ch)
if ssn is not None:
- ssn.closed.set()
+ notify(ch.session.condition)
def session_command_point(self, ch, cp):
ssn = ch.session
@@ -91,8 +93,9 @@ class Server(Delegate):
pass
def connection_open(self, ch, open):
- self.connection.opened.set()
+ self.connection.opened = True
ch.connection_open_ok()
+ notify(self.connection.condition)
class Client(Delegate):
@@ -108,4 +111,5 @@ class Client(Delegate):
ch.connection_open()
def connection_open_ok(self, ch, open_ok):
- self.connection.opened.set()
+ self.connection.opened = True
+ notify(self.connection.condition)
diff --git a/python/qpid/queue.py b/python/qpid/queue.py
index 00946a9156..ea8f00d091 100644
--- a/python/qpid/queue.py
+++ b/python/qpid/queue.py
@@ -35,10 +35,12 @@ class Queue(BaseQueue):
def __init__(self, *args, **kwargs):
BaseQueue.__init__(self, *args, **kwargs)
+ self.error = None
self.listener = None
self.thread = None
- def close(self):
+ def close(self, error = None):
+ self.error = error
self.put(Queue.END)
def get(self, block = True, timeout = None):
@@ -47,7 +49,7 @@ class Queue(BaseQueue):
# this guarantees that any other waiting threads or any future
# calls to get will also result in a Closed exception
self.put(Queue.END)
- raise Closed()
+ raise Closed(self.error)
else:
return result
diff --git a/python/qpid/session.py b/python/qpid/session.py
index c628762dac..b83bd1637f 100644
--- a/python/qpid/session.py
+++ b/python/qpid/session.py
@@ -17,13 +17,14 @@
# under the License.
#
-from threading import Event, RLock
+from threading import Condition, RLock
from invoker import Invoker
from datatypes import RangedSet, Struct, Future
from codec010 import StringCodec
from assembler import Segment
from queue import Queue
from datatypes import Message
+from util import wait
from logging import getLogger
class SessionDetached(Exception): pass
@@ -34,6 +35,8 @@ def client(*args):
def server(*args):
return Server(*args)
+class SessionException(Exception): pass
+
class Session(Invoker):
def __init__(self, name, spec, sync=True, timeout=10, delegate=client):
@@ -42,17 +45,22 @@ class Session(Invoker):
self.sync = sync
self.timeout = timeout
self.channel = None
- self.opened = Event()
- self.closed = Event()
+
+ self.condition = Condition()
+
+ self.send_id = True
self.receiver = Receiver(self)
self.sender = Sender(self)
- self.delegate = delegate(self)
- self.send_id = True
- self.results = {}
+
self.lock = RLock()
self._incoming = {}
+ self.results = {}
+ self.exceptions = []
+
self.assembly = None
+ self.delegate = delegate(self)
+
def incoming(self, destination):
self.lock.acquire()
try:
@@ -66,7 +74,7 @@ class Session(Invoker):
def close(self, timeout=None):
self.channel.session_detach(self.name)
- self.closed.wait(timeout=timeout)
+ wait(self.condition, lambda: self.channel is None, timeout)
def resolve_method(self, name):
cmd = self.spec.instructions.get(name)
@@ -105,7 +113,7 @@ class Session(Invoker):
type.segment_type, type.track, self.channel.id, sc.encoded)
if type.result:
- result = Future()
+ result = Future(exception=SessionException)
self.results[self.sender.next_id] = result
self.send(seg)
@@ -234,9 +242,27 @@ class Delegate:
self.session = session
def execution_result(self, er):
- future = self.session.results[er.command_id]
+ future = self.session.results.pop(er.command_id)
future.set(er.value)
+ def execution_exception(self, ex):
+ self.session.lock.acquire()
+ try:
+ self.session.exceptions.append(ex)
+ excs = self.session.exceptions[:]
+ if len(excs) == 1:
+ error = excs[0]
+ else:
+ error = tuple(excs)
+ for id in self.session.results:
+ f = self.session.results.pop(id)
+ f.error(error)
+
+ for q in self.session._incoming.values():
+ q.close(error)
+ finally:
+ self.session.lock.release()
+
msg = getLogger("qpid.ssn.msg")
class Client(Delegate):
diff --git a/python/qpid/util.py b/python/qpid/util.py
index e41dfc75fb..d03a9bd7e9 100644
--- a/python/qpid/util.py
+++ b/python/qpid/util.py
@@ -17,7 +17,7 @@
# under the License.
#
-import os, socket
+import os, socket, time
def connect(host, port):
sock = socket.socket()
@@ -40,3 +40,28 @@ def listen(host, port, predicate = lambda: True, bound = lambda: None):
def mtime(filename):
return os.stat(filename).st_mtime
+
+def wait(condition, predicate, timeout=None):
+ condition.acquire()
+ try:
+ passed = 0
+ start = time.time()
+ while not predicate():
+ if timeout is None:
+ condition.wait()
+ elif passed < timeout:
+ condition.wait(timeout - passed)
+ else:
+ return False
+ passed = time.time() - start
+ return True
+ finally:
+ condition.release()
+
+def notify(condition, action=lambda: None):
+ condition.acquire()
+ try:
+ action()
+ condition.notifyAll()
+ finally:
+ condition.release()