diff options
author | Alan Conway <aconway@apache.org> | 2007-03-19 19:39:55 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2007-03-19 19:39:55 +0000 |
commit | a96bf8ba7ce40d12ee4b3f85002133e1738225a4 (patch) | |
tree | 13db6eefd1120c228c11ff7d94a500bbbd4d1289 /python/qpid/peer.py | |
parent | 27e6ef93eea10d1aeb7ca6a6a37926aa5f85c380 (diff) | |
download | qpid-python-a96bf8ba7ce40d12ee4b3f85002133e1738225a4.tar.gz |
Merged revisions 504590 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9
........
r504590 | gsim | 2007-02-07 10:36:01 -0500 (Wed, 07 Feb 2007) | 6 lines
Added support for receiving and sending of references
Added asynchronous mode to channels (responses can be tracked via a future, rather than blocking on each request)
Added ability to override server suggested connection tune params
Added two tests for reference functionality (more to follow)
........
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@520061 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/peer.py')
-rw-r--r-- | python/qpid/peer.py | 27 |
1 files changed, 26 insertions, 1 deletions
diff --git a/python/qpid/peer.py b/python/qpid/peer.py index b5c655dc2a..6c8c6647c9 100644 --- a/python/qpid/peer.py +++ b/python/qpid/peer.py @@ -50,13 +50,14 @@ class Sequence: class Peer: - def __init__(self, conn, delegate): + def __init__(self, conn, delegate, channel_callback=None): self.conn = conn self.delegate = delegate self.outgoing = Queue(0) self.work = Queue(0) self.channels = {} self.lock = thread.allocate_lock() + self.channel_callback = channel_callback #notified when channels are created def channel(self, id): self.lock.acquire() @@ -66,6 +67,8 @@ class Peer: except KeyError: ch = Channel(id, self.outgoing, self.conn.spec) self.channels[id] = ch + if self.channel_callback: + self.channel_callback(ch) finally: self.lock.release() return ch @@ -177,6 +180,7 @@ class Channel: # XXX: better switch self.reliable = False + self.synchronous = True def close(self, reason): if self.closed: @@ -238,6 +242,12 @@ class Channel: content = kwargs.pop("content", None) frame = Method(type, type.arguments(*args, **kwargs)) if self.reliable: + if not self.synchronous: + future = Future() + self.request(frame, future.put_response, content) + if not frame.method.responses: return None + else: return future + self.request(frame, self.queue_response, content) if not frame.method.responses: return None @@ -304,3 +314,18 @@ def read_content(queue): buf.write(content) read += len(content) return Content(buf.getvalue(), children, header.properties.copy()) + +class Future: + def __init__(self): + self.completed = threading.Event() + + def put_response(self, channel, response): + self.response = response + self.completed.set() + + def get_response(self, timeout=None): + self.completed.wait(timeout) + return self.response + + def is_complete(self): + return self.completed.isSet() |