diff options
author | Gordon Sim <gsim@apache.org> | 2007-07-27 15:44:52 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2007-07-27 15:44:52 +0000 |
commit | 80406d0fb680239a0141b81fb0b9f20d20c9b1e1 (patch) | |
tree | 13677bf773bf25db03144aa72c97a49d2810240d /python/qpid | |
parent | a9232d5a02a19f093f212cb0b76772a20b45cb1b (diff) | |
download | qpid-python-80406d0fb680239a0141b81fb0b9f20d20c9b1e1.tar.gz |
Use execution layer to acknowledge messages.
Turn off 0-9 framing of requests and responses.
Some refactoring around message delivery.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@560285 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid')
-rw-r--r-- | python/qpid/codec.py | 25 | ||||
-rw-r--r-- | python/qpid/message.py | 8 | ||||
-rw-r--r-- | python/qpid/peer.py | 82 | ||||
-rw-r--r-- | python/qpid/spec.py | 2 |
4 files changed, 95 insertions, 22 deletions
diff --git a/python/qpid/codec.py b/python/qpid/codec.py index a5228e8003..a0d9696c8b 100644 --- a/python/qpid/codec.py +++ b/python/qpid/codec.py @@ -329,12 +329,6 @@ class Codec: return ReferenceId(self.decode_longstr()) # new domains for 0-10: - - def encode_uuid(self, s): - self.encode_longstr(s) - - def decode_uuid(self): - return self.decode_longstr() def encode_rfc1982_long(self, s): self.encode_long(s) @@ -342,10 +336,21 @@ class Codec: def decode_rfc1982_long(self): return self.decode_long() - #Not done yet def encode_rfc1982_long_set(self, s): - self.encode_short(0) + self.encode_short(len(s)) + for i in s: + self.encode_long(i) def decode_rfc1982_long_set(self): - self.decode_short() - return 0; + count = self.decode_short() + set = [] + for i in range(0, count): + set.append(self.decode_long()) + return set; + + #not correct for 0-10 yet + def encode_uuid(self, s): + self.encode_longstr(s) + + def decode_uuid(self): + return self.decode_longstr() diff --git a/python/qpid/message.py b/python/qpid/message.py index f80293180e..970ab9d974 100644 --- a/python/qpid/message.py +++ b/python/qpid/message.py @@ -26,7 +26,10 @@ class Message: self.frame = frame self.method = frame.method_type self.content = content - + if self.method.klass.name != "execution": + self.command_id = self.channel.incoming_completion.sequence.next() + #print "allocated: ", self.command_id, "to ", self.method.klass.name, "_", self.method.name + def __len__(self): return len(self.frame.args) @@ -66,3 +69,6 @@ class Message: def __repr__(self): return Message.REPR % (self.method, self.frame.args, self.content) + + def complete(self, cumulative=True): + self.channel.incoming_completion.complete(mark=self.command_id, cumulative=cumulative) diff --git a/python/qpid/peer.py b/python/qpid/peer.py index 3927f20667..bedc96895b 100644 --- a/python/qpid/peer.py +++ b/python/qpid/peer.py @@ -30,6 +30,7 @@ from message import Message from queue import Queue, Closed as QueueClosed from content import Content from cStringIO import StringIO +from time import time class Sequence: @@ -186,11 +187,11 @@ class Channel: self.requester = Requester(self.write) self.responder = Responder(self.write) - self.completion = ExecutionCompletion() + self.completion = OutgoingCompletion() + self.incoming_completion = IncomingCompletion(self) # Use reliable framing if version == 0-9. - # (also for 0-10 while transitioning...) - self.reliable = (spec.major == 0 and (spec.minor == 9 or spec.minor == 10)) + self.reliable = (spec.major == 0 and spec.minor == 9) self.use_execution_layer = (spec.major == 0 and spec.minor == 10) self.synchronous = True @@ -202,6 +203,7 @@ class Channel: self.incoming.close() self.responses.close() self.completion.close() + self.incoming_completion.reset() def write(self, frame, content = None): if self.closed: @@ -252,6 +254,9 @@ class Channel: self.responder.respond(method, batch, request) def invoke(self, type, args, kwargs): + if type.klass.name == "channel" and (type.name == "close" or type.name == "open"): + self.completion.reset() + self.incoming_completion.reset() self.completion.next_command(type) content = kwargs.pop("content", None) frame = Method(type, type.arguments(*args, **kwargs)) @@ -306,6 +311,13 @@ class Channel: return Message(self, resp, content) else: raise ValueError(resp) + elif self.synchronous and not frame.method.response \ + and self.use_execution_layer and frame.method.klass.name != "execution": + self.execution_flush() + self.completion.wait() + if self.closed: + raise Closed(self.reason) + except QueueClosed, e: if self.closed: raise Closed(self.reason) @@ -349,21 +361,32 @@ class Future: def is_complete(self): return self.completed.isSet() -class ExecutionCompletion: +class OutgoingCompletion: + """ + Manages completion of outgoing commands i.e. command sent by this peer + """ + def __init__(self): self.condition = threading.Condition() - self.sequence = Sequence(1) - self.command_id = 0 - self.mark = 0 + + self.sequence = Sequence(1) #issues ids for outgoing commands + self.command_id = 0 #last issued id + self.mark = 0 #commands up to this mark are known to be complete + self.closed = False def next_command(self, method): #the following test is a hack until the track/sub-channel is available if method.klass.name != "execution": self.command_id = self.sequence.next() + def reset(self): + self.sequence = Sequence(1) #reset counter + def close(self): + self.reset() self.condition.acquire() try: + self.closed = True self.condition.notifyAll() finally: self.condition.release() @@ -378,11 +401,50 @@ class ExecutionCompletion: def wait(self, point_of_interest=-1, timeout=None): if point_of_interest == -1: point_of_interest = self.command_id + start_time = time() + remaining = timeout self.condition.acquire() try: - if point_of_interest > self.mark: - self.condition.wait(timeout) + while not self.closed and point_of_interest > self.mark: + #print "waiting for ", point_of_interest, " mark is currently at ", self.mark + self.condition.wait(remaining) + if timeout: + if start_time + timeout > time(): break + else: remaining = timeout - (time() - start_time) finally: self.condition.release() - #todo: retry until timed out or closed return point_of_interest <= self.mark + +class IncomingCompletion: + """ + Manages completion of incoming commands i.e. command received by this peer + """ + + def __init__(self, channel): + self.sequence = Sequence(1) #issues ids for incoming commands + self.mark = 0 #id of last command of whose completion notification was sent to the other peer + self.channel = channel + + def next_id(self, method): + #the following test is a hack until the track/sub-channel is available + if method.klass.name != "execution": + return self.sequence.next() + else: + return 0 + + def reset(self): + self.sequence = Sequence(1) #reset counter + + def complete(self, mark, cumulative=True): + if cumulative: + if mark > self.mark: + self.mark = mark + self.channel.execution_complete(cumulative_execution_mark=self.mark) + else: + #TODO: record and manage the ranges properly + range = [mark, mark] + self.channel.execution_complete(cumulative_execution_mark=self.mark, ranged_execution_set=range) + + + + diff --git a/python/qpid/spec.py b/python/qpid/spec.py index c537401164..09e7dc9d0b 100644 --- a/python/qpid/spec.py +++ b/python/qpid/spec.py @@ -240,7 +240,7 @@ class Method(Metadata): "content": None, "uuid": "", "rfc1982_long": 0, - "rfc1982_long_set": 0 + "rfc1982_long_set": [] } def define_method(self, name): |