diff options
Diffstat (limited to 'python/qpid/peer.py')
-rw-r--r-- | python/qpid/peer.py | 82 |
1 files changed, 72 insertions, 10 deletions
diff --git a/python/qpid/peer.py b/python/qpid/peer.py index 3927f20667..bedc96895b 100644 --- a/python/qpid/peer.py +++ b/python/qpid/peer.py @@ -30,6 +30,7 @@ from message import Message from queue import Queue, Closed as QueueClosed from content import Content from cStringIO import StringIO +from time import time class Sequence: @@ -186,11 +187,11 @@ class Channel: self.requester = Requester(self.write) self.responder = Responder(self.write) - self.completion = ExecutionCompletion() + self.completion = OutgoingCompletion() + self.incoming_completion = IncomingCompletion(self) # Use reliable framing if version == 0-9. - # (also for 0-10 while transitioning...) - self.reliable = (spec.major == 0 and (spec.minor == 9 or spec.minor == 10)) + self.reliable = (spec.major == 0 and spec.minor == 9) self.use_execution_layer = (spec.major == 0 and spec.minor == 10) self.synchronous = True @@ -202,6 +203,7 @@ class Channel: self.incoming.close() self.responses.close() self.completion.close() + self.incoming_completion.reset() def write(self, frame, content = None): if self.closed: @@ -252,6 +254,9 @@ class Channel: self.responder.respond(method, batch, request) def invoke(self, type, args, kwargs): + if type.klass.name == "channel" and (type.name == "close" or type.name == "open"): + self.completion.reset() + self.incoming_completion.reset() self.completion.next_command(type) content = kwargs.pop("content", None) frame = Method(type, type.arguments(*args, **kwargs)) @@ -306,6 +311,13 @@ class Channel: return Message(self, resp, content) else: raise ValueError(resp) + elif self.synchronous and not frame.method.response \ + and self.use_execution_layer and frame.method.klass.name != "execution": + self.execution_flush() + self.completion.wait() + if self.closed: + raise Closed(self.reason) + except QueueClosed, e: if self.closed: raise Closed(self.reason) @@ -349,21 +361,32 @@ class Future: def is_complete(self): return self.completed.isSet() -class ExecutionCompletion: +class OutgoingCompletion: + """ + Manages completion of outgoing commands i.e. command sent by this peer + """ + def __init__(self): self.condition = threading.Condition() - self.sequence = Sequence(1) - self.command_id = 0 - self.mark = 0 + + self.sequence = Sequence(1) #issues ids for outgoing commands + self.command_id = 0 #last issued id + self.mark = 0 #commands up to this mark are known to be complete + self.closed = False def next_command(self, method): #the following test is a hack until the track/sub-channel is available if method.klass.name != "execution": self.command_id = self.sequence.next() + def reset(self): + self.sequence = Sequence(1) #reset counter + def close(self): + self.reset() self.condition.acquire() try: + self.closed = True self.condition.notifyAll() finally: self.condition.release() @@ -378,11 +401,50 @@ class ExecutionCompletion: def wait(self, point_of_interest=-1, timeout=None): if point_of_interest == -1: point_of_interest = self.command_id + start_time = time() + remaining = timeout self.condition.acquire() try: - if point_of_interest > self.mark: - self.condition.wait(timeout) + while not self.closed and point_of_interest > self.mark: + #print "waiting for ", point_of_interest, " mark is currently at ", self.mark + self.condition.wait(remaining) + if timeout: + if start_time + timeout > time(): break + else: remaining = timeout - (time() - start_time) finally: self.condition.release() - #todo: retry until timed out or closed return point_of_interest <= self.mark + +class IncomingCompletion: + """ + Manages completion of incoming commands i.e. command received by this peer + """ + + def __init__(self, channel): + self.sequence = Sequence(1) #issues ids for incoming commands + self.mark = 0 #id of last command of whose completion notification was sent to the other peer + self.channel = channel + + def next_id(self, method): + #the following test is a hack until the track/sub-channel is available + if method.klass.name != "execution": + return self.sequence.next() + else: + return 0 + + def reset(self): + self.sequence = Sequence(1) #reset counter + + def complete(self, mark, cumulative=True): + if cumulative: + if mark > self.mark: + self.mark = mark + self.channel.execution_complete(cumulative_execution_mark=self.mark) + else: + #TODO: record and manage the ranges properly + range = [mark, mark] + self.channel.execution_complete(cumulative_execution_mark=self.mark, ranged_execution_set=range) + + + + |