summaryrefslogtreecommitdiff
path: root/python/qpid
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-07-27 15:44:52 +0000
committerGordon Sim <gsim@apache.org>2007-07-27 15:44:52 +0000
commit80406d0fb680239a0141b81fb0b9f20d20c9b1e1 (patch)
tree13677bf773bf25db03144aa72c97a49d2810240d /python/qpid
parenta9232d5a02a19f093f212cb0b76772a20b45cb1b (diff)
downloadqpid-python-80406d0fb680239a0141b81fb0b9f20d20c9b1e1.tar.gz
Use execution layer to acknowledge messages.
Turn off 0-9 framing of requests and responses. Some refactoring around message delivery. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@560285 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid')
-rw-r--r--python/qpid/codec.py25
-rw-r--r--python/qpid/message.py8
-rw-r--r--python/qpid/peer.py82
-rw-r--r--python/qpid/spec.py2
4 files changed, 95 insertions, 22 deletions
diff --git a/python/qpid/codec.py b/python/qpid/codec.py
index a5228e8003..a0d9696c8b 100644
--- a/python/qpid/codec.py
+++ b/python/qpid/codec.py
@@ -329,12 +329,6 @@ class Codec:
return ReferenceId(self.decode_longstr())
# new domains for 0-10:
-
- def encode_uuid(self, s):
- self.encode_longstr(s)
-
- def decode_uuid(self):
- return self.decode_longstr()
def encode_rfc1982_long(self, s):
self.encode_long(s)
@@ -342,10 +336,21 @@ class Codec:
def decode_rfc1982_long(self):
return self.decode_long()
- #Not done yet
def encode_rfc1982_long_set(self, s):
- self.encode_short(0)
+ self.encode_short(len(s))
+ for i in s:
+ self.encode_long(i)
def decode_rfc1982_long_set(self):
- self.decode_short()
- return 0;
+ count = self.decode_short()
+ set = []
+ for i in range(0, count):
+ set.append(self.decode_long())
+ return set;
+
+ #not correct for 0-10 yet
+ def encode_uuid(self, s):
+ self.encode_longstr(s)
+
+ def decode_uuid(self):
+ return self.decode_longstr()
diff --git a/python/qpid/message.py b/python/qpid/message.py
index f80293180e..970ab9d974 100644
--- a/python/qpid/message.py
+++ b/python/qpid/message.py
@@ -26,7 +26,10 @@ class Message:
self.frame = frame
self.method = frame.method_type
self.content = content
-
+ if self.method.klass.name != "execution":
+ self.command_id = self.channel.incoming_completion.sequence.next()
+ #print "allocated: ", self.command_id, "to ", self.method.klass.name, "_", self.method.name
+
def __len__(self):
return len(self.frame.args)
@@ -66,3 +69,6 @@ class Message:
def __repr__(self):
return Message.REPR % (self.method, self.frame.args, self.content)
+
+ def complete(self, cumulative=True):
+ self.channel.incoming_completion.complete(mark=self.command_id, cumulative=cumulative)
diff --git a/python/qpid/peer.py b/python/qpid/peer.py
index 3927f20667..bedc96895b 100644
--- a/python/qpid/peer.py
+++ b/python/qpid/peer.py
@@ -30,6 +30,7 @@ from message import Message
from queue import Queue, Closed as QueueClosed
from content import Content
from cStringIO import StringIO
+from time import time
class Sequence:
@@ -186,11 +187,11 @@ class Channel:
self.requester = Requester(self.write)
self.responder = Responder(self.write)
- self.completion = ExecutionCompletion()
+ self.completion = OutgoingCompletion()
+ self.incoming_completion = IncomingCompletion(self)
# Use reliable framing if version == 0-9.
- # (also for 0-10 while transitioning...)
- self.reliable = (spec.major == 0 and (spec.minor == 9 or spec.minor == 10))
+ self.reliable = (spec.major == 0 and spec.minor == 9)
self.use_execution_layer = (spec.major == 0 and spec.minor == 10)
self.synchronous = True
@@ -202,6 +203,7 @@ class Channel:
self.incoming.close()
self.responses.close()
self.completion.close()
+ self.incoming_completion.reset()
def write(self, frame, content = None):
if self.closed:
@@ -252,6 +254,9 @@ class Channel:
self.responder.respond(method, batch, request)
def invoke(self, type, args, kwargs):
+ if type.klass.name == "channel" and (type.name == "close" or type.name == "open"):
+ self.completion.reset()
+ self.incoming_completion.reset()
self.completion.next_command(type)
content = kwargs.pop("content", None)
frame = Method(type, type.arguments(*args, **kwargs))
@@ -306,6 +311,13 @@ class Channel:
return Message(self, resp, content)
else:
raise ValueError(resp)
+ elif self.synchronous and not frame.method.response \
+ and self.use_execution_layer and frame.method.klass.name != "execution":
+ self.execution_flush()
+ self.completion.wait()
+ if self.closed:
+ raise Closed(self.reason)
+
except QueueClosed, e:
if self.closed:
raise Closed(self.reason)
@@ -349,21 +361,32 @@ class Future:
def is_complete(self):
return self.completed.isSet()
-class ExecutionCompletion:
+class OutgoingCompletion:
+ """
+ Manages completion of outgoing commands i.e. command sent by this peer
+ """
+
def __init__(self):
self.condition = threading.Condition()
- self.sequence = Sequence(1)
- self.command_id = 0
- self.mark = 0
+
+ self.sequence = Sequence(1) #issues ids for outgoing commands
+ self.command_id = 0 #last issued id
+ self.mark = 0 #commands up to this mark are known to be complete
+ self.closed = False
def next_command(self, method):
#the following test is a hack until the track/sub-channel is available
if method.klass.name != "execution":
self.command_id = self.sequence.next()
+ def reset(self):
+ self.sequence = Sequence(1) #reset counter
+
def close(self):
+ self.reset()
self.condition.acquire()
try:
+ self.closed = True
self.condition.notifyAll()
finally:
self.condition.release()
@@ -378,11 +401,50 @@ class ExecutionCompletion:
def wait(self, point_of_interest=-1, timeout=None):
if point_of_interest == -1: point_of_interest = self.command_id
+ start_time = time()
+ remaining = timeout
self.condition.acquire()
try:
- if point_of_interest > self.mark:
- self.condition.wait(timeout)
+ while not self.closed and point_of_interest > self.mark:
+ #print "waiting for ", point_of_interest, " mark is currently at ", self.mark
+ self.condition.wait(remaining)
+ if timeout:
+ if start_time + timeout > time(): break
+ else: remaining = timeout - (time() - start_time)
finally:
self.condition.release()
- #todo: retry until timed out or closed
return point_of_interest <= self.mark
+
+class IncomingCompletion:
+ """
+ Manages completion of incoming commands i.e. command received by this peer
+ """
+
+ def __init__(self, channel):
+ self.sequence = Sequence(1) #issues ids for incoming commands
+ self.mark = 0 #id of last command of whose completion notification was sent to the other peer
+ self.channel = channel
+
+ def next_id(self, method):
+ #the following test is a hack until the track/sub-channel is available
+ if method.klass.name != "execution":
+ return self.sequence.next()
+ else:
+ return 0
+
+ def reset(self):
+ self.sequence = Sequence(1) #reset counter
+
+ def complete(self, mark, cumulative=True):
+ if cumulative:
+ if mark > self.mark:
+ self.mark = mark
+ self.channel.execution_complete(cumulative_execution_mark=self.mark)
+ else:
+ #TODO: record and manage the ranges properly
+ range = [mark, mark]
+ self.channel.execution_complete(cumulative_execution_mark=self.mark, ranged_execution_set=range)
+
+
+
+
diff --git a/python/qpid/spec.py b/python/qpid/spec.py
index c537401164..09e7dc9d0b 100644
--- a/python/qpid/spec.py
+++ b/python/qpid/spec.py
@@ -240,7 +240,7 @@ class Method(Metadata):
"content": None,
"uuid": "",
"rfc1982_long": 0,
- "rfc1982_long_set": 0
+ "rfc1982_long_set": []
}
def define_method(self, name):