summaryrefslogtreecommitdiff
path: root/python/qpid/peer.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/qpid/peer.py')
-rw-r--r--python/qpid/peer.py30
1 files changed, 30 insertions, 0 deletions
diff --git a/python/qpid/peer.py b/python/qpid/peer.py
index 72e6a19bc7..9880eea19b 100644
--- a/python/qpid/peer.py
+++ b/python/qpid/peer.py
@@ -186,6 +186,8 @@ class Channel:
self.requester = Requester(self.write)
self.responder = Responder(self.write)
+ self.completion = ExecutionCompletion()
+
# Use reliable framing if version == 0-9.
self.reliable = (spec.major == 0 and spec.minor == 9)
self.synchronous = True
@@ -247,6 +249,7 @@ class Channel:
self.responder.respond(method, batch, request)
def invoke(self, type, args, kwargs):
+ self.completion.next_command(type)
content = kwargs.pop("content", None)
frame = Method(type, type.arguments(*args, **kwargs))
if self.reliable:
@@ -337,3 +340,30 @@ class Future:
def is_complete(self):
return self.completed.isSet()
+
+class ExecutionCompletion:
+ def __init__(self):
+ self.completed = threading.Event()
+ self.sequence = Sequence(0)
+ self.command_id = 0
+ self.mark = 0
+
+ def next_command(self, method):
+ #the following test is a hack until the track/sub-channel is available
+ if method.klass.name != "execution":
+ self.command_id = self.sequence.next()
+
+ def complete(self, mark):
+ self.mark = mark
+ self.completed.set()
+ self.completed.clear()
+
+ def wait(self, point_of_interest=-1, timeout=None):
+ """
+ todo: really want to allow different threads to call this with
+ different points of interest on the same channel, this is a quick
+ hack for now
+ """
+ if point_of_interest == -1: point_of_interest = self.command_id
+ self.completed.wait(timeout)
+ return point_of_interest <= self.mark