summaryrefslogtreecommitdiff
path: root/python/qpid/peer.py
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-03-19 19:39:55 +0000
committerAlan Conway <aconway@apache.org>2007-03-19 19:39:55 +0000
commita96bf8ba7ce40d12ee4b3f85002133e1738225a4 (patch)
tree13db6eefd1120c228c11ff7d94a500bbbd4d1289 /python/qpid/peer.py
parent27e6ef93eea10d1aeb7ca6a6a37926aa5f85c380 (diff)
downloadqpid-python-a96bf8ba7ce40d12ee4b3f85002133e1738225a4.tar.gz
Merged revisions 504590 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9 ........ r504590 | gsim | 2007-02-07 10:36:01 -0500 (Wed, 07 Feb 2007) | 6 lines Added support for receiving and sending of references Added asynchronous mode to channels (responses can be tracked via a future, rather than blocking on each request) Added ability to override server suggested connection tune params Added two tests for reference functionality (more to follow) ........ git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@520061 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/peer.py')
-rw-r--r--python/qpid/peer.py27
1 files changed, 26 insertions, 1 deletions
diff --git a/python/qpid/peer.py b/python/qpid/peer.py
index b5c655dc2a..6c8c6647c9 100644
--- a/python/qpid/peer.py
+++ b/python/qpid/peer.py
@@ -50,13 +50,14 @@ class Sequence:
class Peer:
- def __init__(self, conn, delegate):
+ def __init__(self, conn, delegate, channel_callback=None):
self.conn = conn
self.delegate = delegate
self.outgoing = Queue(0)
self.work = Queue(0)
self.channels = {}
self.lock = thread.allocate_lock()
+ self.channel_callback = channel_callback #notified when channels are created
def channel(self, id):
self.lock.acquire()
@@ -66,6 +67,8 @@ class Peer:
except KeyError:
ch = Channel(id, self.outgoing, self.conn.spec)
self.channels[id] = ch
+ if self.channel_callback:
+ self.channel_callback(ch)
finally:
self.lock.release()
return ch
@@ -177,6 +180,7 @@ class Channel:
# XXX: better switch
self.reliable = False
+ self.synchronous = True
def close(self, reason):
if self.closed:
@@ -238,6 +242,12 @@ class Channel:
content = kwargs.pop("content", None)
frame = Method(type, type.arguments(*args, **kwargs))
if self.reliable:
+ if not self.synchronous:
+ future = Future()
+ self.request(frame, future.put_response, content)
+ if not frame.method.responses: return None
+ else: return future
+
self.request(frame, self.queue_response, content)
if not frame.method.responses:
return None
@@ -304,3 +314,18 @@ def read_content(queue):
buf.write(content)
read += len(content)
return Content(buf.getvalue(), children, header.properties.copy())
+
+class Future:
+ def __init__(self):
+ self.completed = threading.Event()
+
+ def put_response(self, channel, response):
+ self.response = response
+ self.completed.set()
+
+ def get_response(self, timeout=None):
+ self.completed.wait(timeout)
+ return self.response
+
+ def is_complete(self):
+ return self.completed.isSet()