diff options
Diffstat (limited to 'python/qpid')
-rw-r--r-- | python/qpid/__init__.py | 23 | ||||
-rw-r--r-- | python/qpid/client.py | 6 | ||||
-rw-r--r-- | python/qpid/codec.py | 27 | ||||
-rw-r--r-- | python/qpid/connection.py | 6 | ||||
-rw-r--r-- | python/qpid/content.py | 8 | ||||
-rw-r--r-- | python/qpid/message.py | 2 | ||||
-rw-r--r-- | python/qpid/peer.py | 16 | ||||
-rw-r--r-- | python/qpid/spec.py | 16 |
8 files changed, 90 insertions, 14 deletions
diff --git a/python/qpid/__init__.py b/python/qpid/__init__.py index 4363f175fb..4aeccae38e 100644 --- a/python/qpid/__init__.py +++ b/python/qpid/__init__.py @@ -18,3 +18,26 @@ # import spec, codec, connection, content, peer, delegate, client + +class Struct: + + def __init__(self, type): + self.__dict__["type"] = type + self.__dict__["_values"] = {} + + def _check(self, attr): + field = self.type.fields.byname.get(attr) + if field == None: + raise AttributeError(attr) + return field + + def __setattr__(self, attr, value): + self._check(attr) + self._values[attr] = value + + def __getattr__(self, attr): + field = self._check(attr) + return self._values.get(attr, field.default()) + + def __str__(self): + return "%s %s" % (self.type.type, self._values) diff --git a/python/qpid/client.py b/python/qpid/client.py index f1800204db..3efd79c389 100644 --- a/python/qpid/client.py +++ b/python/qpid/client.py @@ -110,7 +110,7 @@ class ClientDelegate(Delegate): #todo: just override the params, i.e. don't require them # all to be included in tune_params msg.tune_ok(**self.client.tune_params) - else: + else: msg.tune_ok(*msg.frame.args) self.client.started.set() @@ -143,6 +143,10 @@ class ClientDelegate(Delegate): def execution_complete(self, ch, msg): ch.completion.complete(msg.cumulative_execution_mark) + def execution_result(self, ch, msg): + future = ch.futures[msg.command_id] + future.put_response(ch, msg.data) + def close(self, reason): self.client.closed = True self.client.reason = reason diff --git a/python/qpid/codec.py b/python/qpid/codec.py index a11486376d..3920f2c8d9 100644 --- a/python/qpid/codec.py +++ b/python/qpid/codec.py @@ -26,7 +26,7 @@ fields. The unit test for this module is located in tests/codec.py """ -import re +import re, qpid from cStringIO import StringIO from struct import * from reference import ReferenceId @@ -40,11 +40,12 @@ class Codec: class that handles encoding/decoding of AMQP primitives """ - def __init__(self, stream): + def __init__(self, stream, spec): """ initializing the stream/fields used """ self.stream = stream + self.spec = spec self.nwrote = 0 self.nread = 0 self.incoming_bits = [] @@ -163,7 +164,7 @@ class Codec: # short int's valid range is [0,65535] if (o < 0 or o > 65535): - raise ValueError('Valid range of short int is [0,65535]') + raise ValueError('Valid range of short int is [0,65535]: %s' % o) self.pack("!H", o) @@ -255,7 +256,7 @@ class Codec: encodes a table data structure in network byte order """ enc = StringIO() - codec = Codec(enc) + codec = Codec(enc, self.spec) if tbl: for key, value in tbl.items(): if len(key) > 128: @@ -356,3 +357,21 @@ class Codec: def decode_uuid(self): return self.decode_longstr() + + def encode_long_struct(self, s): + enc = StringIO() + codec = Codec(enc, self.spec) + type = s.type + codec.encode_short(type.type) + for f in type.fields: + codec.encode(f.type, getattr(s, f.name)) + codec.flush() + self.encode_longstr(enc.getvalue()) + + def decode_long_struct(self): + codec = Codec(StringIO(self.decode_longstr()), self.spec) + type = self.spec.structs[codec.decode_short()] + s = qpid.Struct(type) + for f in type.fields: + setattr(s, f.name, codec.decode(f.type)) + return s diff --git a/python/qpid/connection.py b/python/qpid/connection.py index cdfa2c2dc0..58235117ef 100644 --- a/python/qpid/connection.py +++ b/python/qpid/connection.py @@ -74,7 +74,7 @@ def listen(host, port, predicate = lambda: True): class Connection: def __init__(self, io, spec): - self.codec = codec.Codec(io) + self.codec = codec.Codec(io, spec) self.spec = spec self.FRAME_END = self.spec.constants.byname["frame_end"].id @@ -95,7 +95,7 @@ class Connection: c.encode_octet(self.spec.constants.byname[frame.type].id) c.encode_short(frame.channel) body = StringIO() - enc = codec.Codec(body) + enc = codec.Codec(body, self.spec) frame.encode(enc) enc.flush() c.encode_longstr(body.getvalue()) @@ -106,7 +106,7 @@ class Connection: type = self.spec.constants.byid[c.decode_octet()].name channel = c.decode_short() body = c.decode_longstr() - dec = codec.Codec(StringIO(body)) + dec = codec.Codec(StringIO(body), self.spec) frame = Frame.DECODERS[type].decode(self.spec, dec, len(body)) frame.channel = channel end = c.decode_octet() diff --git a/python/qpid/content.py b/python/qpid/content.py index bcbea1697c..9391f4f1a8 100644 --- a/python/qpid/content.py +++ b/python/qpid/content.py @@ -48,3 +48,11 @@ class Content: def __delitem__(self, name): del self.properties[name] + + def __str__(self): + if self.children: + return "%s [%s] %s" % (self.properties, + ", ".join(map(str, self.children)), + self.body) + else: + return "%s %s" % (self.properties, self.body) diff --git a/python/qpid/message.py b/python/qpid/message.py index 7e28b63037..c9ea5a8a0c 100644 --- a/python/qpid/message.py +++ b/python/qpid/message.py @@ -29,7 +29,7 @@ class Message: if self.method.is_l4_command(): self.command_id = self.channel.incoming_completion.sequence.next() #print "allocated: ", self.command_id, "to ", self.method.klass.name, "_", self.method.name - + def __len__(self): return len(self.frame.args) diff --git a/python/qpid/peer.py b/python/qpid/peer.py index 03c48bef90..6762f774f4 100644 --- a/python/qpid/peer.py +++ b/python/qpid/peer.py @@ -189,6 +189,7 @@ class Channel: self.completion = OutgoingCompletion() self.incoming_completion = IncomingCompletion(self) + self.futures = {} # Use reliable framing if version == 0-9. if spec.major == 0 and spec.minor == 9: @@ -261,6 +262,7 @@ class Channel: self.completion.reset() self.incoming_completion.reset() self.completion.next_command(type) + content = kwargs.pop("content", None) frame = Method(type, type.arguments(*args, **kwargs)) return self.invoker(frame, content) @@ -275,7 +277,7 @@ class Channel: self.request(frame, self.queue_response, content) if not frame.method.responses: - if self.use_execution_layer and type.is_l4_command(): + if self.use_execution_layer and frame.method_type.is_l4_command(): self.execution_flush() self.completion.wait() if self.closed: @@ -287,7 +289,6 @@ class Channel: return Message(self, resp, read_content(self.responses)) else: return Message(self, resp) - except QueueClosed, e: if self.closed: raise Closed(self.reason) @@ -296,6 +297,11 @@ class Channel: # used for 0-8 and 0-10 def invoke_method(self, frame, content = None): + if frame.method.result: + cmd_id = self.completion.command_id + future = Future() + self.futures[cmd_id] = future + self.write(frame, content) try: @@ -316,6 +322,11 @@ class Channel: return Message(self, resp, content) else: raise ValueError(resp) + elif frame.method.result: + if self.synchronous: + return future.get_response(timeout=10) + else: + return future elif self.synchronous and not frame.method.response \ and self.use_execution_layer and frame.method.is_l4_command(): self.execution_flush() @@ -324,7 +335,6 @@ class Channel: raise Closed(self.reason) if not completed: self.close("Timed-out waiting for completion") - except QueueClosed, e: if self.closed: raise Closed(self.reason) diff --git a/python/qpid/spec.py b/python/qpid/spec.py index f9d305c133..8a511bcb3d 100644 --- a/python/qpid/spec.py +++ b/python/qpid/spec.py @@ -91,6 +91,8 @@ class Spec(Metadata): self.classes = SpecContainer() # methods indexed by classname_methname self.methods = {} + # structs by type code + self.structs = {} def post_load(self): self.module = self.define_module("amqp%s%s" % (self.major, self.minor)) @@ -295,13 +297,18 @@ class Field(Metadata): self.description = description self.docs = docs + def default(self): + return Method.DEFAULTS[self.type] + def get_result(nd, spec): result = nd["result"] if not result: return None name = result["@domain"] if name != None: return spec.domains.byname[name] st_nd = result["struct"] - st = Struct(st_nd["@size"], st_nd["@type"], st_nd["@pack"]) + st = Struct(st_nd["@size"], int(result.parent.parent["@index"])*256 + + int(st_nd["@type"]), st_nd["@pack"]) + spec.structs[st.type] = st load_fields(st_nd, st.fields, spec.domains.byname) return st @@ -352,7 +359,12 @@ def load(specfile, *errata): type = nd["@type"] if type == None: st_nd = nd["struct"] - type = Struct(st_nd["@size"], st_nd["@type"], st_nd["@pack"]) + code = st_nd["@type"] + if code not in (None, "", "none"): + code = int(code) + type = Struct(st_nd["@size"], code, st_nd["@pack"]) + if type.type != None: + spec.structs[type.type] = type structs.append((type, st_nd)) else: type = pythonize(type) |