summaryrefslogtreecommitdiff
path: root/python/qpid/peer.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/qpid/peer.py')
-rw-r--r--python/qpid/peer.py82
1 files changed, 72 insertions, 10 deletions
diff --git a/python/qpid/peer.py b/python/qpid/peer.py
index 3927f20667..bedc96895b 100644
--- a/python/qpid/peer.py
+++ b/python/qpid/peer.py
@@ -30,6 +30,7 @@ from message import Message
from queue import Queue, Closed as QueueClosed
from content import Content
from cStringIO import StringIO
+from time import time
class Sequence:
@@ -186,11 +187,11 @@ class Channel:
self.requester = Requester(self.write)
self.responder = Responder(self.write)
- self.completion = ExecutionCompletion()
+ self.completion = OutgoingCompletion()
+ self.incoming_completion = IncomingCompletion(self)
# Use reliable framing if version == 0-9.
- # (also for 0-10 while transitioning...)
- self.reliable = (spec.major == 0 and (spec.minor == 9 or spec.minor == 10))
+ self.reliable = (spec.major == 0 and spec.minor == 9)
self.use_execution_layer = (spec.major == 0 and spec.minor == 10)
self.synchronous = True
@@ -202,6 +203,7 @@ class Channel:
self.incoming.close()
self.responses.close()
self.completion.close()
+ self.incoming_completion.reset()
def write(self, frame, content = None):
if self.closed:
@@ -252,6 +254,9 @@ class Channel:
self.responder.respond(method, batch, request)
def invoke(self, type, args, kwargs):
+ if type.klass.name == "channel" and (type.name == "close" or type.name == "open"):
+ self.completion.reset()
+ self.incoming_completion.reset()
self.completion.next_command(type)
content = kwargs.pop("content", None)
frame = Method(type, type.arguments(*args, **kwargs))
@@ -306,6 +311,13 @@ class Channel:
return Message(self, resp, content)
else:
raise ValueError(resp)
+ elif self.synchronous and not frame.method.response \
+ and self.use_execution_layer and frame.method.klass.name != "execution":
+ self.execution_flush()
+ self.completion.wait()
+ if self.closed:
+ raise Closed(self.reason)
+
except QueueClosed, e:
if self.closed:
raise Closed(self.reason)
@@ -349,21 +361,32 @@ class Future:
def is_complete(self):
return self.completed.isSet()
-class ExecutionCompletion:
+class OutgoingCompletion:
+ """
+ Manages completion of outgoing commands i.e. command sent by this peer
+ """
+
def __init__(self):
self.condition = threading.Condition()
- self.sequence = Sequence(1)
- self.command_id = 0
- self.mark = 0
+
+ self.sequence = Sequence(1) #issues ids for outgoing commands
+ self.command_id = 0 #last issued id
+ self.mark = 0 #commands up to this mark are known to be complete
+ self.closed = False
def next_command(self, method):
#the following test is a hack until the track/sub-channel is available
if method.klass.name != "execution":
self.command_id = self.sequence.next()
+ def reset(self):
+ self.sequence = Sequence(1) #reset counter
+
def close(self):
+ self.reset()
self.condition.acquire()
try:
+ self.closed = True
self.condition.notifyAll()
finally:
self.condition.release()
@@ -378,11 +401,50 @@ class ExecutionCompletion:
def wait(self, point_of_interest=-1, timeout=None):
if point_of_interest == -1: point_of_interest = self.command_id
+ start_time = time()
+ remaining = timeout
self.condition.acquire()
try:
- if point_of_interest > self.mark:
- self.condition.wait(timeout)
+ while not self.closed and point_of_interest > self.mark:
+ #print "waiting for ", point_of_interest, " mark is currently at ", self.mark
+ self.condition.wait(remaining)
+ if timeout:
+ if start_time + timeout > time(): break
+ else: remaining = timeout - (time() - start_time)
finally:
self.condition.release()
- #todo: retry until timed out or closed
return point_of_interest <= self.mark
+
+class IncomingCompletion:
+ """
+ Manages completion of incoming commands i.e. command received by this peer
+ """
+
+ def __init__(self, channel):
+ self.sequence = Sequence(1) #issues ids for incoming commands
+ self.mark = 0 #id of last command of whose completion notification was sent to the other peer
+ self.channel = channel
+
+ def next_id(self, method):
+ #the following test is a hack until the track/sub-channel is available
+ if method.klass.name != "execution":
+ return self.sequence.next()
+ else:
+ return 0
+
+ def reset(self):
+ self.sequence = Sequence(1) #reset counter
+
+ def complete(self, mark, cumulative=True):
+ if cumulative:
+ if mark > self.mark:
+ self.mark = mark
+ self.channel.execution_complete(cumulative_execution_mark=self.mark)
+ else:
+ #TODO: record and manage the ranges properly
+ range = [mark, mark]
+ self.channel.execution_complete(cumulative_execution_mark=self.mark, ranged_execution_set=range)
+
+
+
+