diff options
Diffstat (limited to 'qpid/python')
-rw-r--r-- | qpid/python/qpid/client.py | 3 | ||||
-rw-r--r-- | qpid/python/qpid/peer.py | 30 | ||||
-rw-r--r-- | qpid/python/tests_0-9/basic.py | 2 |
3 files changed, 34 insertions, 1 deletions
diff --git a/qpid/python/qpid/client.py b/qpid/python/qpid/client.py index cdceb87bdf..f1800204db 100644 --- a/qpid/python/qpid/client.py +++ b/qpid/python/qpid/client.py @@ -140,6 +140,9 @@ class ClientDelegate(Delegate): def connection_close(self, ch, msg): self.client.peer.close(msg) + def execution_complete(self, ch, msg): + ch.completion.complete(msg.cumulative_execution_mark) + def close(self, reason): self.client.closed = True self.client.reason = reason diff --git a/qpid/python/qpid/peer.py b/qpid/python/qpid/peer.py index 72e6a19bc7..9880eea19b 100644 --- a/qpid/python/qpid/peer.py +++ b/qpid/python/qpid/peer.py @@ -186,6 +186,8 @@ class Channel: self.requester = Requester(self.write) self.responder = Responder(self.write) + self.completion = ExecutionCompletion() + # Use reliable framing if version == 0-9. self.reliable = (spec.major == 0 and spec.minor == 9) self.synchronous = True @@ -247,6 +249,7 @@ class Channel: self.responder.respond(method, batch, request) def invoke(self, type, args, kwargs): + self.completion.next_command(type) content = kwargs.pop("content", None) frame = Method(type, type.arguments(*args, **kwargs)) if self.reliable: @@ -337,3 +340,30 @@ class Future: def is_complete(self): return self.completed.isSet() + +class ExecutionCompletion: + def __init__(self): + self.completed = threading.Event() + self.sequence = Sequence(0) + self.command_id = 0 + self.mark = 0 + + def next_command(self, method): + #the following test is a hack until the track/sub-channel is available + if method.klass.name != "execution": + self.command_id = self.sequence.next() + + def complete(self, mark): + self.mark = mark + self.completed.set() + self.completed.clear() + + def wait(self, point_of_interest=-1, timeout=None): + """ + todo: really want to allow different threads to call this with + different points of interest on the same channel, this is a quick + hack for now + """ + if point_of_interest == -1: point_of_interest = self.command_id + self.completed.wait(timeout) + return point_of_interest <= self.mark diff --git a/qpid/python/tests_0-9/basic.py b/qpid/python/tests_0-9/basic.py index 75633908cd..e7d22e00da 100644 --- a/qpid/python/tests_0-9/basic.py +++ b/qpid/python/tests_0-9/basic.py @@ -127,7 +127,7 @@ class BasicTests(TestBase): channel.basic_publish(routing_key="test-queue-4", content=Content("One")) myqueue = self.client.queue("my-consumer") - msg = myqueue.get(timeout=5) + msg = myqueue.get(timeout=1) self.assertEqual("One", msg.content.body) #cancel should stop messages being delivered |