summaryrefslogtreecommitdiff
path: root/python/qpid/peer.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/qpid/peer.py')
-rw-r--r--python/qpid/peer.py16
1 files changed, 13 insertions, 3 deletions
diff --git a/python/qpid/peer.py b/python/qpid/peer.py
index 03c48bef90..6762f774f4 100644
--- a/python/qpid/peer.py
+++ b/python/qpid/peer.py
@@ -189,6 +189,7 @@ class Channel:
self.completion = OutgoingCompletion()
self.incoming_completion = IncomingCompletion(self)
+ self.futures = {}
# Use reliable framing if version == 0-9.
if spec.major == 0 and spec.minor == 9:
@@ -261,6 +262,7 @@ class Channel:
self.completion.reset()
self.incoming_completion.reset()
self.completion.next_command(type)
+
content = kwargs.pop("content", None)
frame = Method(type, type.arguments(*args, **kwargs))
return self.invoker(frame, content)
@@ -275,7 +277,7 @@ class Channel:
self.request(frame, self.queue_response, content)
if not frame.method.responses:
- if self.use_execution_layer and type.is_l4_command():
+ if self.use_execution_layer and frame.method_type.is_l4_command():
self.execution_flush()
self.completion.wait()
if self.closed:
@@ -287,7 +289,6 @@ class Channel:
return Message(self, resp, read_content(self.responses))
else:
return Message(self, resp)
-
except QueueClosed, e:
if self.closed:
raise Closed(self.reason)
@@ -296,6 +297,11 @@ class Channel:
# used for 0-8 and 0-10
def invoke_method(self, frame, content = None):
+ if frame.method.result:
+ cmd_id = self.completion.command_id
+ future = Future()
+ self.futures[cmd_id] = future
+
self.write(frame, content)
try:
@@ -316,6 +322,11 @@ class Channel:
return Message(self, resp, content)
else:
raise ValueError(resp)
+ elif frame.method.result:
+ if self.synchronous:
+ return future.get_response(timeout=10)
+ else:
+ return future
elif self.synchronous and not frame.method.response \
and self.use_execution_layer and frame.method.is_l4_command():
self.execution_flush()
@@ -324,7 +335,6 @@ class Channel:
raise Closed(self.reason)
if not completed:
self.close("Timed-out waiting for completion")
-
except QueueClosed, e:
if self.closed:
raise Closed(self.reason)