diff options
Diffstat (limited to 'python/qpid/peer.py')
-rw-r--r-- | python/qpid/peer.py | 30 |
1 files changed, 30 insertions, 0 deletions
diff --git a/python/qpid/peer.py b/python/qpid/peer.py index 72e6a19bc7..9880eea19b 100644 --- a/python/qpid/peer.py +++ b/python/qpid/peer.py @@ -186,6 +186,8 @@ class Channel: self.requester = Requester(self.write) self.responder = Responder(self.write) + self.completion = ExecutionCompletion() + # Use reliable framing if version == 0-9. self.reliable = (spec.major == 0 and spec.minor == 9) self.synchronous = True @@ -247,6 +249,7 @@ class Channel: self.responder.respond(method, batch, request) def invoke(self, type, args, kwargs): + self.completion.next_command(type) content = kwargs.pop("content", None) frame = Method(type, type.arguments(*args, **kwargs)) if self.reliable: @@ -337,3 +340,30 @@ class Future: def is_complete(self): return self.completed.isSet() + +class ExecutionCompletion: + def __init__(self): + self.completed = threading.Event() + self.sequence = Sequence(0) + self.command_id = 0 + self.mark = 0 + + def next_command(self, method): + #the following test is a hack until the track/sub-channel is available + if method.klass.name != "execution": + self.command_id = self.sequence.next() + + def complete(self, mark): + self.mark = mark + self.completed.set() + self.completed.clear() + + def wait(self, point_of_interest=-1, timeout=None): + """ + todo: really want to allow different threads to call this with + different points of interest on the same channel, this is a quick + hack for now + """ + if point_of_interest == -1: point_of_interest = self.command_id + self.completed.wait(timeout) + return point_of_interest <= self.mark |