diff options
Diffstat (limited to 'python')
-rwxr-xr-x | python/hello-world | 12 | ||||
-rw-r--r-- | python/qpid/__init__.py | 23 | ||||
-rw-r--r-- | python/qpid/client.py | 6 | ||||
-rw-r--r-- | python/qpid/codec.py | 27 | ||||
-rw-r--r-- | python/qpid/connection.py | 6 | ||||
-rw-r--r-- | python/qpid/content.py | 8 | ||||
-rw-r--r-- | python/qpid/message.py | 2 | ||||
-rw-r--r-- | python/qpid/peer.py | 16 | ||||
-rw-r--r-- | python/qpid/spec.py | 16 | ||||
-rwxr-xr-x | python/server | 44 | ||||
-rw-r--r-- | python/tests/codec.py | 7 |
11 files changed, 139 insertions, 28 deletions
diff --git a/python/hello-world b/python/hello-world index 8b7a2752c5..4fb065ae53 100755 --- a/python/hello-world +++ b/python/hello-world @@ -3,13 +3,13 @@ import qpid from qpid.client import Client from qpid.content import Content -client = Client("127.0.0.1", 5672, qpid.spec.load("../specs/amqp.0-9.xml", - "../specs/amqp-errata.0-9.xml")) +client = Client("127.0.0.1", 5672, qpid.spec.load("../specs/amqp.0-10-preview.xml")) client.start({"LOGIN": "guest", "PASSWORD": "guest"}) ch = client.channel(1) -ch.channel_open() +ch.session_open() ch.queue_declare(queue="test") ch.queue_bind(exchange="amq.direct", queue="test", routing_key="test") -ch.message_consume(queue="test", destination="test") -ch.message_transfer(destination="amq.direct", routing_key="test", - body="hello world") +print ch.queue_query(queue="test") +ch.message_subscribe(queue="test", destination="test") +ch.message_transfer(destination="amq.direct", + content=Content("hello world")) diff --git a/python/qpid/__init__.py b/python/qpid/__init__.py index 4363f175fb..4aeccae38e 100644 --- a/python/qpid/__init__.py +++ b/python/qpid/__init__.py @@ -18,3 +18,26 @@ # import spec, codec, connection, content, peer, delegate, client + +class Struct: + + def __init__(self, type): + self.__dict__["type"] = type + self.__dict__["_values"] = {} + + def _check(self, attr): + field = self.type.fields.byname.get(attr) + if field == None: + raise AttributeError(attr) + return field + + def __setattr__(self, attr, value): + self._check(attr) + self._values[attr] = value + + def __getattr__(self, attr): + field = self._check(attr) + return self._values.get(attr, field.default()) + + def __str__(self): + return "%s %s" % (self.type.type, self._values) diff --git a/python/qpid/client.py b/python/qpid/client.py index f1800204db..3efd79c389 100644 --- a/python/qpid/client.py +++ b/python/qpid/client.py @@ -110,7 +110,7 @@ class ClientDelegate(Delegate): #todo: just override the params, i.e. don't require them # all to be included in tune_params msg.tune_ok(**self.client.tune_params) - else: + else: msg.tune_ok(*msg.frame.args) self.client.started.set() @@ -143,6 +143,10 @@ class ClientDelegate(Delegate): def execution_complete(self, ch, msg): ch.completion.complete(msg.cumulative_execution_mark) + def execution_result(self, ch, msg): + future = ch.futures[msg.command_id] + future.put_response(ch, msg.data) + def close(self, reason): self.client.closed = True self.client.reason = reason diff --git a/python/qpid/codec.py b/python/qpid/codec.py index a11486376d..3920f2c8d9 100644 --- a/python/qpid/codec.py +++ b/python/qpid/codec.py @@ -26,7 +26,7 @@ fields. The unit test for this module is located in tests/codec.py """ -import re +import re, qpid from cStringIO import StringIO from struct import * from reference import ReferenceId @@ -40,11 +40,12 @@ class Codec: class that handles encoding/decoding of AMQP primitives """ - def __init__(self, stream): + def __init__(self, stream, spec): """ initializing the stream/fields used """ self.stream = stream + self.spec = spec self.nwrote = 0 self.nread = 0 self.incoming_bits = [] @@ -163,7 +164,7 @@ class Codec: # short int's valid range is [0,65535] if (o < 0 or o > 65535): - raise ValueError('Valid range of short int is [0,65535]') + raise ValueError('Valid range of short int is [0,65535]: %s' % o) self.pack("!H", o) @@ -255,7 +256,7 @@ class Codec: encodes a table data structure in network byte order """ enc = StringIO() - codec = Codec(enc) + codec = Codec(enc, self.spec) if tbl: for key, value in tbl.items(): if len(key) > 128: @@ -356,3 +357,21 @@ class Codec: def decode_uuid(self): return self.decode_longstr() + + def encode_long_struct(self, s): + enc = StringIO() + codec = Codec(enc, self.spec) + type = s.type + codec.encode_short(type.type) + for f in type.fields: + codec.encode(f.type, getattr(s, f.name)) + codec.flush() + self.encode_longstr(enc.getvalue()) + + def decode_long_struct(self): + codec = Codec(StringIO(self.decode_longstr()), self.spec) + type = self.spec.structs[codec.decode_short()] + s = qpid.Struct(type) + for f in type.fields: + setattr(s, f.name, codec.decode(f.type)) + return s diff --git a/python/qpid/connection.py b/python/qpid/connection.py index cdfa2c2dc0..58235117ef 100644 --- a/python/qpid/connection.py +++ b/python/qpid/connection.py @@ -74,7 +74,7 @@ def listen(host, port, predicate = lambda: True): class Connection: def __init__(self, io, spec): - self.codec = codec.Codec(io) + self.codec = codec.Codec(io, spec) self.spec = spec self.FRAME_END = self.spec.constants.byname["frame_end"].id @@ -95,7 +95,7 @@ class Connection: c.encode_octet(self.spec.constants.byname[frame.type].id) c.encode_short(frame.channel) body = StringIO() - enc = codec.Codec(body) + enc = codec.Codec(body, self.spec) frame.encode(enc) enc.flush() c.encode_longstr(body.getvalue()) @@ -106,7 +106,7 @@ class Connection: type = self.spec.constants.byid[c.decode_octet()].name channel = c.decode_short() body = c.decode_longstr() - dec = codec.Codec(StringIO(body)) + dec = codec.Codec(StringIO(body), self.spec) frame = Frame.DECODERS[type].decode(self.spec, dec, len(body)) frame.channel = channel end = c.decode_octet() diff --git a/python/qpid/content.py b/python/qpid/content.py index bcbea1697c..9391f4f1a8 100644 --- a/python/qpid/content.py +++ b/python/qpid/content.py @@ -48,3 +48,11 @@ class Content: def __delitem__(self, name): del self.properties[name] + + def __str__(self): + if self.children: + return "%s [%s] %s" % (self.properties, + ", ".join(map(str, self.children)), + self.body) + else: + return "%s %s" % (self.properties, self.body) diff --git a/python/qpid/message.py b/python/qpid/message.py index 7e28b63037..c9ea5a8a0c 100644 --- a/python/qpid/message.py +++ b/python/qpid/message.py @@ -29,7 +29,7 @@ class Message: if self.method.is_l4_command(): self.command_id = self.channel.incoming_completion.sequence.next() #print "allocated: ", self.command_id, "to ", self.method.klass.name, "_", self.method.name - + def __len__(self): return len(self.frame.args) diff --git a/python/qpid/peer.py b/python/qpid/peer.py index 03c48bef90..6762f774f4 100644 --- a/python/qpid/peer.py +++ b/python/qpid/peer.py @@ -189,6 +189,7 @@ class Channel: self.completion = OutgoingCompletion() self.incoming_completion = IncomingCompletion(self) + self.futures = {} # Use reliable framing if version == 0-9. if spec.major == 0 and spec.minor == 9: @@ -261,6 +262,7 @@ class Channel: self.completion.reset() self.incoming_completion.reset() self.completion.next_command(type) + content = kwargs.pop("content", None) frame = Method(type, type.arguments(*args, **kwargs)) return self.invoker(frame, content) @@ -275,7 +277,7 @@ class Channel: self.request(frame, self.queue_response, content) if not frame.method.responses: - if self.use_execution_layer and type.is_l4_command(): + if self.use_execution_layer and frame.method_type.is_l4_command(): self.execution_flush() self.completion.wait() if self.closed: @@ -287,7 +289,6 @@ class Channel: return Message(self, resp, read_content(self.responses)) else: return Message(self, resp) - except QueueClosed, e: if self.closed: raise Closed(self.reason) @@ -296,6 +297,11 @@ class Channel: # used for 0-8 and 0-10 def invoke_method(self, frame, content = None): + if frame.method.result: + cmd_id = self.completion.command_id + future = Future() + self.futures[cmd_id] = future + self.write(frame, content) try: @@ -316,6 +322,11 @@ class Channel: return Message(self, resp, content) else: raise ValueError(resp) + elif frame.method.result: + if self.synchronous: + return future.get_response(timeout=10) + else: + return future elif self.synchronous and not frame.method.response \ and self.use_execution_layer and frame.method.is_l4_command(): self.execution_flush() @@ -324,7 +335,6 @@ class Channel: raise Closed(self.reason) if not completed: self.close("Timed-out waiting for completion") - except QueueClosed, e: if self.closed: raise Closed(self.reason) diff --git a/python/qpid/spec.py b/python/qpid/spec.py index f9d305c133..8a511bcb3d 100644 --- a/python/qpid/spec.py +++ b/python/qpid/spec.py @@ -91,6 +91,8 @@ class Spec(Metadata): self.classes = SpecContainer() # methods indexed by classname_methname self.methods = {} + # structs by type code + self.structs = {} def post_load(self): self.module = self.define_module("amqp%s%s" % (self.major, self.minor)) @@ -295,13 +297,18 @@ class Field(Metadata): self.description = description self.docs = docs + def default(self): + return Method.DEFAULTS[self.type] + def get_result(nd, spec): result = nd["result"] if not result: return None name = result["@domain"] if name != None: return spec.domains.byname[name] st_nd = result["struct"] - st = Struct(st_nd["@size"], st_nd["@type"], st_nd["@pack"]) + st = Struct(st_nd["@size"], int(result.parent.parent["@index"])*256 + + int(st_nd["@type"]), st_nd["@pack"]) + spec.structs[st.type] = st load_fields(st_nd, st.fields, spec.domains.byname) return st @@ -352,7 +359,12 @@ def load(specfile, *errata): type = nd["@type"] if type == None: st_nd = nd["struct"] - type = Struct(st_nd["@size"], st_nd["@type"], st_nd["@pack"]) + code = st_nd["@type"] + if code not in (None, "", "none"): + code = int(code) + type = Struct(st_nd["@size"], code, st_nd["@pack"]) + if type.type != None: + spec.structs[type.type] = type structs.append((type, st_nd)) else: type = pythonize(type) diff --git a/python/server b/python/server index 56f0f32081..37416314e2 100755 --- a/python/server +++ b/python/server @@ -3,22 +3,54 @@ import qpid from qpid.connection import Connection, listen from qpid.delegate import Delegate from qpid.peer import Peer +from qpid import Struct class Server(Delegate): + def __init__(self): + Delegate.__init__(self) + self.queues = {} + self.bindings = {} + def connection_open(self, ch, msg): msg.open_ok() - def channel_open(self, ch, msg): - print "channel %s open" % ch.id - msg.open_ok() + def session_open(self, ch, msg): + print "session open on channel %s" % ch.id + msg.attached() + + def execution_flush(self, ch, msg): + pass + + def queue_declare(self, ch, msg): + self.queues[msg.queue] = [] + print "queue declared: %s" % msg.queue + msg.complete() + + def queue_bind(self, ch, msg): + if self.bindings.has_key(msg.exchange): + queues = self.bindings[msg.exchange] + else: + queues = set() + self.bindings[msg.exchange] = queues + queues.add((msg.routing_key, msg.queue)) + msg.complete() + + def queue_query(self, ch, msg): + st = Struct(msg.method.result) + ch.execution_result(msg.command_id, st) + msg.complete() + + def message_subscribe(self, ch, msg): + print msg + msg.complete() def message_transfer(self, ch, msg): - print msg.body - msg.ok() + print msg.content + msg.complete() -spec = qpid.spec.load("../specs/amqp.0-9.xml") +spec = qpid.spec.load("../specs/amqp.0-10-preview.xml") for io in listen("0.0.0.0", 5672): c = Connection(io, spec) diff --git a/python/tests/codec.py b/python/tests/codec.py index 785f1aba6b..689a2cf4c1 100644 --- a/python/tests/codec.py +++ b/python/tests/codec.py @@ -20,6 +20,7 @@ import unittest from qpid.codec import Codec +from qpid.spec import load from cStringIO import StringIO from qpid.reference import ReferenceId @@ -52,11 +53,13 @@ __doc__ = """ """ +SPEC = load("../specs/amqp.0-10-preview.xml") # -------------------------------------- # -------------------------------------- class BaseDataTypes(unittest.TestCase): + """ Base class containing common functions """ @@ -66,7 +69,7 @@ class BaseDataTypes(unittest.TestCase): """ standard setUp for unitetest (refer unittest documentation for details) """ - self.codec = Codec(StringIO()) + self.codec = Codec(StringIO(), SPEC) # ------------------ def tearDown(self): @@ -504,7 +507,7 @@ def test(type, value): else: values = [value] stream = StringIO() - codec = Codec(stream) + codec = Codec(stream, SPEC) for v in values: codec.encode(type, v) codec.flush() |