summaryrefslogtreecommitdiff
path: root/qpid/python
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/python')
-rw-r--r--qpid/python/qpid/client.py3
-rw-r--r--qpid/python/qpid/peer.py30
-rw-r--r--qpid/python/tests_0-9/basic.py2
3 files changed, 34 insertions, 1 deletions
diff --git a/qpid/python/qpid/client.py b/qpid/python/qpid/client.py
index cdceb87bdf..f1800204db 100644
--- a/qpid/python/qpid/client.py
+++ b/qpid/python/qpid/client.py
@@ -140,6 +140,9 @@ class ClientDelegate(Delegate):
def connection_close(self, ch, msg):
self.client.peer.close(msg)
+ def execution_complete(self, ch, msg):
+ ch.completion.complete(msg.cumulative_execution_mark)
+
def close(self, reason):
self.client.closed = True
self.client.reason = reason
diff --git a/qpid/python/qpid/peer.py b/qpid/python/qpid/peer.py
index 72e6a19bc7..9880eea19b 100644
--- a/qpid/python/qpid/peer.py
+++ b/qpid/python/qpid/peer.py
@@ -186,6 +186,8 @@ class Channel:
self.requester = Requester(self.write)
self.responder = Responder(self.write)
+ self.completion = ExecutionCompletion()
+
# Use reliable framing if version == 0-9.
self.reliable = (spec.major == 0 and spec.minor == 9)
self.synchronous = True
@@ -247,6 +249,7 @@ class Channel:
self.responder.respond(method, batch, request)
def invoke(self, type, args, kwargs):
+ self.completion.next_command(type)
content = kwargs.pop("content", None)
frame = Method(type, type.arguments(*args, **kwargs))
if self.reliable:
@@ -337,3 +340,30 @@ class Future:
def is_complete(self):
return self.completed.isSet()
+
+class ExecutionCompletion:
+ def __init__(self):
+ self.completed = threading.Event()
+ self.sequence = Sequence(0)
+ self.command_id = 0
+ self.mark = 0
+
+ def next_command(self, method):
+ #the following test is a hack until the track/sub-channel is available
+ if method.klass.name != "execution":
+ self.command_id = self.sequence.next()
+
+ def complete(self, mark):
+ self.mark = mark
+ self.completed.set()
+ self.completed.clear()
+
+ def wait(self, point_of_interest=-1, timeout=None):
+ """
+ todo: really want to allow different threads to call this with
+ different points of interest on the same channel, this is a quick
+ hack for now
+ """
+ if point_of_interest == -1: point_of_interest = self.command_id
+ self.completed.wait(timeout)
+ return point_of_interest <= self.mark
diff --git a/qpid/python/tests_0-9/basic.py b/qpid/python/tests_0-9/basic.py
index 75633908cd..e7d22e00da 100644
--- a/qpid/python/tests_0-9/basic.py
+++ b/qpid/python/tests_0-9/basic.py
@@ -127,7 +127,7 @@ class BasicTests(TestBase):
channel.basic_publish(routing_key="test-queue-4", content=Content("One"))
myqueue = self.client.queue("my-consumer")
- msg = myqueue.get(timeout=5)
+ msg = myqueue.get(timeout=1)
self.assertEqual("One", msg.content.body)
#cancel should stop messages being delivered