diff options
Diffstat (limited to 'python/qpid/peer.py')
-rw-r--r-- | python/qpid/peer.py | 27 |
1 files changed, 26 insertions, 1 deletions
diff --git a/python/qpid/peer.py b/python/qpid/peer.py index b5c655dc2a..6c8c6647c9 100644 --- a/python/qpid/peer.py +++ b/python/qpid/peer.py @@ -50,13 +50,14 @@ class Sequence: class Peer: - def __init__(self, conn, delegate): + def __init__(self, conn, delegate, channel_callback=None): self.conn = conn self.delegate = delegate self.outgoing = Queue(0) self.work = Queue(0) self.channels = {} self.lock = thread.allocate_lock() + self.channel_callback = channel_callback #notified when channels are created def channel(self, id): self.lock.acquire() @@ -66,6 +67,8 @@ class Peer: except KeyError: ch = Channel(id, self.outgoing, self.conn.spec) self.channels[id] = ch + if self.channel_callback: + self.channel_callback(ch) finally: self.lock.release() return ch @@ -177,6 +180,7 @@ class Channel: # XXX: better switch self.reliable = False + self.synchronous = True def close(self, reason): if self.closed: @@ -238,6 +242,12 @@ class Channel: content = kwargs.pop("content", None) frame = Method(type, type.arguments(*args, **kwargs)) if self.reliable: + if not self.synchronous: + future = Future() + self.request(frame, future.put_response, content) + if not frame.method.responses: return None + else: return future + self.request(frame, self.queue_response, content) if not frame.method.responses: return None @@ -304,3 +314,18 @@ def read_content(queue): buf.write(content) read += len(content) return Content(buf.getvalue(), children, header.properties.copy()) + +class Future: + def __init__(self): + self.completed = threading.Event() + + def put_response(self, channel, response): + self.response = response + self.completed.set() + + def get_response(self, timeout=None): + self.completed.wait(timeout) + return self.response + + def is_complete(self): + return self.completed.isSet() |