summaryrefslogtreecommitdiff
path: root/python/qpid/peer.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/qpid/peer.py')
-rw-r--r--python/qpid/peer.py27
1 files changed, 26 insertions, 1 deletions
diff --git a/python/qpid/peer.py b/python/qpid/peer.py
index b5c655dc2a..6c8c6647c9 100644
--- a/python/qpid/peer.py
+++ b/python/qpid/peer.py
@@ -50,13 +50,14 @@ class Sequence:
class Peer:
- def __init__(self, conn, delegate):
+ def __init__(self, conn, delegate, channel_callback=None):
self.conn = conn
self.delegate = delegate
self.outgoing = Queue(0)
self.work = Queue(0)
self.channels = {}
self.lock = thread.allocate_lock()
+ self.channel_callback = channel_callback #notified when channels are created
def channel(self, id):
self.lock.acquire()
@@ -66,6 +67,8 @@ class Peer:
except KeyError:
ch = Channel(id, self.outgoing, self.conn.spec)
self.channels[id] = ch
+ if self.channel_callback:
+ self.channel_callback(ch)
finally:
self.lock.release()
return ch
@@ -177,6 +180,7 @@ class Channel:
# XXX: better switch
self.reliable = False
+ self.synchronous = True
def close(self, reason):
if self.closed:
@@ -238,6 +242,12 @@ class Channel:
content = kwargs.pop("content", None)
frame = Method(type, type.arguments(*args, **kwargs))
if self.reliable:
+ if not self.synchronous:
+ future = Future()
+ self.request(frame, future.put_response, content)
+ if not frame.method.responses: return None
+ else: return future
+
self.request(frame, self.queue_response, content)
if not frame.method.responses:
return None
@@ -304,3 +314,18 @@ def read_content(queue):
buf.write(content)
read += len(content)
return Content(buf.getvalue(), children, header.properties.copy())
+
+class Future:
+ def __init__(self):
+ self.completed = threading.Event()
+
+ def put_response(self, channel, response):
+ self.response = response
+ self.completed.set()
+
+ def get_response(self, timeout=None):
+ self.completed.wait(timeout)
+ return self.response
+
+ def is_complete(self):
+ return self.completed.isSet()