diff options
author | Rafael H. Schloming <rhs@apache.org> | 2007-01-16 05:28:25 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2007-01-16 05:28:25 +0000 |
commit | 8021505b531ba4c022ab89f8a2bd59bf8af3c1fc (patch) | |
tree | 7165613b0996b55b466e140accdf6f81bae33b4c /python | |
parent | d24ea0b5a3f4ca7c1a7f30f9af99b4e7338a1d85 (diff) | |
download | qpid-python-8021505b531ba4c022ab89f8a2bd59bf8af3c1fc.tar.gz |
0-9 request/response framing for python
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@496593 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python')
-rwxr-xr-x | python/amqp-doc | 2 | ||||
-rwxr-xr-x | python/hello-world | 11 | ||||
-rw-r--r-- | python/qpid/client.py | 21 | ||||
-rw-r--r-- | python/qpid/codec.py | 18 | ||||
-rw-r--r-- | python/qpid/connection.py | 204 | ||||
-rw-r--r-- | python/qpid/delegate.py | 20 | ||||
-rw-r--r-- | python/qpid/message.py | 63 | ||||
-rw-r--r-- | python/qpid/peer.py | 190 | ||||
-rw-r--r-- | python/qpid/spec.py | 74 | ||||
-rwxr-xr-x | python/server | 30 | ||||
-rw-r--r-- | python/tests/example.py | 4 |
11 files changed, 424 insertions, 213 deletions
diff --git a/python/amqp-doc b/python/amqp-doc index 0e7f9e862a..00226d63cb 100755 --- a/python/amqp-doc +++ b/python/amqp-doc @@ -42,7 +42,7 @@ except GetoptError, e: die(str(e)) regexp = False -spec = "../specs/amqp.0-8.xml" +spec = "../specs/amqp.0-9.xml" for k, v in opts: if k == "-e" or k == "--regexp": regexp = True if k == "-s" or k == "--spec": spec = v diff --git a/python/hello-world b/python/hello-world new file mode 100755 index 0000000000..b05873dff3 --- /dev/null +++ b/python/hello-world @@ -0,0 +1,11 @@ +#!/usr/bin/env python +import qpid +from qpid.client import Client +from qpid.content import Content + +client = Client("127.0.0.1", 5672, qpid.spec.load("../specs/amqp.0-9.xml")) +client.start({"LOGIN": "guest", "PASSWORD": "guest"}) +ch = client.channel(1) +ch.channel_open() +ch.message_transfer(destination="amq.direct", routing_key="asdf", + body="hello world") diff --git a/python/qpid/client.py b/python/qpid/client.py index b4a282f251..3083cd4933 100644 --- a/python/qpid/client.py +++ b/python/qpid/client.py @@ -25,7 +25,7 @@ interacting with the server. import threading from peer import Peer, Closed from delegate import Delegate -from connection import Connection, Frame +from connection import Connection, Frame, connect from spec import load from queue import Queue @@ -49,15 +49,13 @@ class Client: self.lock = threading.Lock() self.closed = False + self.reason = None self.started = threading.Event() - self.conn = Connection(self.host, self.port, self.spec) - self.peer = Peer(self.conn, ClientDelegate(self)) - def wait(self): self.started.wait() if self.closed: - raise EOFError() + raise Closed(self.reason) def queue(self, key): self.lock.acquire() @@ -76,7 +74,9 @@ class Client: self.response = response self.locale = locale - self.conn.connect() + self.conn = Connection(connect(self.host, self.port), self.spec) + self.peer = Peer(self.conn, ClientDelegate(self)) + self.conn.init() self.peer.start() self.wait() @@ -92,12 +92,12 @@ class ClientDelegate(Delegate): self.client = client def connection_start(self, ch, msg): - ch.connection_start_ok(mechanism=self.client.mechanism, - response=self.client.response, - locale=self.client.locale) + msg.start_ok(mechanism=self.client.mechanism, + response=self.client.response, + locale=self.client.locale) def connection_tune(self, ch, msg): - ch.connection_tune_ok(*msg.fields) + msg.tune_ok(*msg.frame.args) self.client.started.set() def basic_deliver(self, ch, msg): @@ -111,4 +111,5 @@ class ClientDelegate(Delegate): def close(self, reason): self.client.closed = True + self.client.reason = reason self.client.started.set() diff --git a/python/qpid/codec.py b/python/qpid/codec.py index 69c7ca8afa..d8617c2937 100644 --- a/python/qpid/codec.py +++ b/python/qpid/codec.py @@ -185,6 +185,24 @@ class Codec: result[key] = value return result + def encode_timestamp(self, t): + # XXX + self.encode_longlong(t) + + def decode_timestamp(self): + # XXX + return self.decode_longlong() + + def encode_content(self, s): + # XXX + self.encode_octet(0) + self.encode_longstr(s) + + def decode_content(self): + # XXX + self.decode_octet() + return self.decode_longstr() + def test(type, value): if isinstance(value, (list, tuple)): values = value diff --git a/python/qpid/connection.py b/python/qpid/connection.py index 0b788e091b..75fb134760 100644 --- a/python/qpid/connection.py +++ b/python/qpid/connection.py @@ -25,7 +25,7 @@ server, or even a proxy implementation. import socket, codec,logging from cStringIO import StringIO -from spec import load, pythonize +from spec import load from codec import EOF class SockIO: @@ -53,19 +53,27 @@ class SockIO: def flush(self): pass +def connect(host, port): + sock = socket.socket() + sock.connect((host, port)) + sock.setblocking(1) + return SockIO(sock) + +def listen(host, port, predicate = lambda: True): + sock = socket.socket() + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + sock.bind((host, port)) + sock.listen(5) + while predicate(): + s, a = sock.accept() + yield SockIO(s) + class Connection: - def __init__(self, host, port, spec): - self.host = host - self.port = port + def __init__(self, io, spec): + self.codec = codec.Codec(io) self.spec = spec - self.FRAME_END = self.spec.constants.bypyname["frame_end"].id - - def connect(self): - sock = socket.socket() - sock.connect((self.host, self.port)) - sock.setblocking(1) - self.codec = codec.Codec(SockIO(sock)) + self.FRAME_END = self.spec.constants.byname["frame_end"].id def flush(self): self.codec.flush() @@ -76,53 +84,55 @@ class Connection: self.codec.pack(Connection.INIT, "AMQP", 1, 1, self.spec.major, self.spec.minor) + def tini(self): + self.codec.unpack(Connection.INIT) + def write(self, frame): c = self.codec - c.encode_octet(self.spec.constants.bypyname[frame.payload.type].id) + c.encode_octet(self.spec.constants.byname[frame.type].id) c.encode_short(frame.channel) - frame.payload.encode(c) + body = StringIO() + enc = codec.Codec(body) + frame.encode(enc) + enc.flush() + c.encode_longstr(body.getvalue()) c.encode_octet(self.FRAME_END) def read(self): c = self.codec - type = pythonize(self.spec.constants.byid[c.decode_octet()].name) + type = self.spec.constants.byid[c.decode_octet()].name channel = c.decode_short() - payload = Frame.DECODERS[type].decode(self.spec, c) + body = c.decode_longstr() + dec = codec.Codec(StringIO(body)) + frame = Frame.DECODERS[type].decode(self.spec, dec, len(body)) + frame.channel = channel end = c.decode_octet() if end != self.FRAME_END: - raise "frame error: expected %r, got %r" % (self.FRAME_END, end) - frame = Frame(channel, payload) + garbage = "" + while end != self.FRAME_END: + garbage += chr(end) + end = c.decode_octet() + raise "frame error: expected %r, got %r" % (self.FRAME_END, garbage) return frame class Frame: - METHOD = "frame_method" - HEADER = "frame_header" - BODY = "frame_body" - OOB_METHOD = "frame_oob_method" - OOB_HEADER = "frame_oob_header" - OOB_BODY = "frame_oob_body" - TRACE = "frame_trace" - HEARTBEAT = "frame_heartbeat" - DECODERS = {} - def __init__(self, channel, payload): - self.channel = channel - self.payload = payload - - def __str__(self): - return "[%d] %s" % (self.channel, self.payload) - -class Payload: - class __metaclass__(type): def __new__(cls, name, bases, dict): - for req in ("encode", "decode", "type"): - if not dict.has_key(req): - raise TypeError("%s must define %s" % (name, req)) + for attr in ("encode", "decode", "type"): + if not dict.has_key(attr): + raise TypeError("%s must define %s" % (name, attr)) dict["decode"] = staticmethod(dict["decode"]) + if dict.has_key("__init__"): + __init__ = dict["__init__"] + def init(self, *args, **kwargs): + args = list(args) + self.init(args, kwargs) + __init__(self, *args, **kwargs) + dict["__init__"] = init t = type.__new__(cls, name, bases, dict) if t.type != None: Frame.DECODERS[t.type] = t @@ -130,50 +140,100 @@ class Payload: type = None + def init(self, args, kwargs): + self.channel = kwargs.pop("channel", 0) + def encode(self, enc): abstract - def decode(spec, dec): abstract + def decode(spec, dec, size): abstract -class Method(Payload): +class Method(Frame): - type = Frame.METHOD + type = "frame_method" - def __init__(self, method, *args): + def __init__(self, method, args): if len(args) != len(method.fields): - argspec = ["%s: %s" % (pythonize(f.name), f.type) + argspec = ["%s: %s" % (f.name, f.type) for f in method.fields] raise TypeError("%s.%s expecting (%s), got %s" % - (pythonize(method.klass.name), - pythonize(method.name), ", ".join(argspec), args)) + (method.klass.name, method.name, ", ".join(argspec), + args)) self.method = method + self.method_type = method self.args = args - def encode(self, enc): - buf = StringIO() - c = codec.Codec(buf) + def encode(self, c): c.encode_short(self.method.klass.id) c.encode_short(self.method.id) for field, arg in zip(self.method.fields, self.args): c.encode(field.type, arg) - c.flush() - enc.encode_longstr(buf.getvalue()) - def decode(spec, dec): - enc = dec.decode_longstr() - c = codec.Codec(StringIO(enc)) + def decode(spec, c, size): klass = spec.classes.byid[c.decode_short()] meth = klass.methods.byid[c.decode_short()] args = tuple([c.decode(f.type) for f in meth.fields]) - return Method(meth, *args) + return Method(meth, args) def __str__(self): - return "%s %s" % (self.method, ", ".join([str(a) for a in self.args])) + return "[%s] %s %s" % (self.channel, self.method, + ", ".join([str(a) for a in self.args])) + +class Request(Frame): -class Header(Payload): + type = "frame_request" - type = Frame.HEADER + def __init__(self, id, response_mark, method): + self.id = id + self.response_mark = response_mark + self.method = method + self.method_type = method.method_type + self.args = method.args + + def encode(self, enc): + enc.encode_longlong(self.id) + enc.encode_longlong(self.response_mark) + # reserved + enc.encode_long(0) + self.method.encode(enc) + + def decode(spec, dec, size): + id = dec.decode_longlong() + mark = dec.decode_longlong() + # reserved + dec.decode_long() + method = Method.decode(spec, dec, size - 20) + return Request(id, mark, method) + +class Response(Frame): + + type = "frame_response" + + def __init__(self, id, request_id, batch_offset, method): + self.id = id + self.request_id = request_id + self.batch_offset = batch_offset + self.method = method + self.method_type = method.method_type + self.args = method.args + + def encode(self, enc): + enc.encode_longlong(self.id) + enc.encode_longlong(self.request_id) + enc.encode_long(self.batch_offset) + self.method.encode(enc) - def __init__(self, klass, weight, size, **properties): + def decode(spec, dec, size): + id = dec.decode_longlong() + request_id = dec.decode_longlong() + batch_offset = dec.decode_long() + method = Method.decode(spec, dec, size - 20) + return Response(id, request_id, batch_offset, method) + +class Header(Frame): + + type = "frame_header" + + def __init__(self, klass, weight, size, properties): self.klass = klass self.weight = weight self.size = size @@ -188,9 +248,7 @@ class Header(Payload): def __delitem__(self, name): del self.properties[name] - def encode(self, enc): - buf = StringIO() - c = codec.Codec(buf) + def encode(self, c): c.encode_short(self.klass.id) c.encode_short(self.weight) c.encode_longlong(self.size) @@ -218,11 +276,8 @@ class Header(Payload): v = self.properties.get(f.name) if v != None: c.encode(f.type, v) - c.flush() - enc.encode_longstr(buf.getvalue()) - def decode(spec, dec): - c = codec.Codec(StringIO(dec.decode_longstr())) + def decode(spec, c, size): klass = spec.classes.byid[c.decode_short()] weight = c.decode_short() size = c.decode_longlong() @@ -247,24 +302,31 @@ class Header(Payload): # plain '' strings can be used as keywords so we need to # stringify the names. properties[str(f.name)] = c.decode(f.type) - return Header(klass, weight, size, **properties) + return Header(klass, weight, size, properties) def __str__(self): return "%s %s %s %s" % (self.klass, self.weight, self.size, self.properties) -class Body(Payload): +class Body(Frame): - type = Frame.BODY + type = "frame_body" def __init__(self, content): self.content = content def encode(self, enc): - enc.encode_longstr(self.content) + enc.write(self.content) - def decode(spec, dec): - return Body(dec.decode_longstr()) + def decode(spec, dec, size): + return Body(dec.read(size)) def __str__(self): return "Body(%r)" % self.content + +# TODO: +# OOB_METHOD = "frame_oob_method" +# OOB_HEADER = "frame_oob_header" +# OOB_BODY = "frame_oob_body" +# TRACE = "frame_trace" +# HEARTBEAT = "frame_heartbeat" diff --git a/python/qpid/delegate.py b/python/qpid/delegate.py index 035bb3c476..90e5c1edc8 100644 --- a/python/qpid/delegate.py +++ b/python/qpid/delegate.py @@ -22,33 +22,25 @@ Delegate implementation intended for use with the peer module. """ import threading, inspect -from spec import pythonize +from connection import Method, Request, Response class Delegate: def __init__(self): self.handlers = {} self.invokers = {} - # initialize all the mixins - self.invoke_all("init") - def invoke_all(self, meth, *args, **kwargs): - for cls in inspect.getmro(self.__class__): - if hasattr(cls, meth): - getattr(cls, meth)(self, *args, **kwargs) - - def dispatch(self, channel, message): - method = message.method + def __call__(self, channel, frame): + method = frame.method try: handler = self.handlers[method] except KeyError: - name = "%s_%s" % (pythonize(method.klass.name), - pythonize(method.name)) + name = "%s_%s" % (method.klass.name, method.name) handler = getattr(self, name) self.handlers[method] = handler - return handler(channel, message) + return handler(channel, frame) def close(self, reason): - self.invoke_all("close", reason) + print "Connection closed: %s" % reason diff --git a/python/qpid/message.py b/python/qpid/message.py index 914b878147..29c8654937 100644 --- a/python/qpid/message.py +++ b/python/qpid/message.py @@ -16,22 +16,19 @@ # specific language governing permissions and limitations # under the License. # +from connection import Method, Request from sets import Set class Message: - COMMON_FIELDS = Set(("content", "method", "fields")) - - def __init__(self, method, fields, content = None): - self.method = method - self.fields = fields + def __init__(self, channel, frame, content = None): + self.channel = channel + self.frame = frame + self.method = frame.method_type self.content = content def __len__(self): - l = len(self.fields) - if self.method.content: - l += 1 - return len(self.fields) + return len(self.frame.args) def _idx(self, idx): if idx < 0: idx += len(self) @@ -40,45 +37,29 @@ class Message: return idx def __getitem__(self, idx): - idx = self._idx(idx) - if idx == len(self.fields): - return self.content - else: - return self.fields[idx] - - def __setitem__(self, idx, value): - idx = self._idx(idx) - if idx == len(self.fields): - self.content = value - else: - self.fields[idx] = value + return self.frame.args[idx] - def _slot(self, attr): - if attr in Message.COMMON_FIELDS: - env = self.__dict__ - key = attr + def __getattr__(self, attr): + fields = self.method.fields.byname + if fields.has_key(attr): + f = fields[attr] + result = self[self.method.fields.index(f)] else: - env = self.fields - try: - field = self.method.fields.bypyname[attr] - key = self.method.fields.index(field) - except KeyError: + for r in self.method.responses: + if attr == r.name: + result = lambda *args, **kwargs: \ + self.channel.respond(Method(r, r.arguments(*args, **kwargs)), + self.frame) + break + else: raise AttributeError(attr) - return env, key - - def __getattr__(self, attr): - env, key = self._slot(attr) - return env[key] - - def __setattr__(self, attr, value): - env, key = self._slot(attr) - env[attr] = value + return result STR = "%s %s content = %s" REPR = STR.replace("%s", "%r") def __str__(self): - return Message.STR % (self.method, self.fields, self.content) + return Message.STR % (self.method, self.frame.args, self.content) def __repr__(self): - return Message.REPR % (self.method, self.fields, self.content) + return Message.REPR % (self.method, self.frame.args, self.content) diff --git a/python/qpid/peer.py b/python/qpid/peer.py index acffeb2537..a5686a917d 100644 --- a/python/qpid/peer.py +++ b/python/qpid/peer.py @@ -24,13 +24,30 @@ sorts incoming frames to their intended channels, and dispatches incoming method frames to a delegate. """ -import thread, traceback, socket, sys, logging -from connection import Frame, EOF, Method, Header, Body +import thread, threading, traceback, socket, sys, logging +from connection import EOF, Method, Header, Body, Request, Response from message import Message from queue import Queue, Closed as QueueClosed from content import Content from cStringIO import StringIO +class Sequence: + + def __init__(self, start, step = 1): + # we should keep start for wrap around + self._next = start + self.step = step + self.lock = thread.allocate_lock() + + def next(self): + self.lock.acquire() + try: + result = self._next + self._next += self.step + return result + finally: + self.lock.release() + class Peer: def __init__(self, conn, delegate): @@ -39,8 +56,6 @@ class Peer: self.outgoing = Queue(0) self.work = Queue(0) self.channels = {} - self.Channel = type("Channel%s" % conn.spec.klass.__name__, - (Channel, conn.spec.klass), {}) self.lock = thread.allocate_lock() def channel(self, id): @@ -49,7 +64,7 @@ class Peer: try: ch = self.channels[id] except KeyError: - ch = self.Channel(id, self.outgoing) + ch = Channel(id, self.outgoing, self.conn.spec) self.channels[id] = ch finally: self.lock.release() @@ -64,7 +79,7 @@ class Peer: """Call when an unexpected exception occurs that will kill a thread.""" if message: print >> sys.stderr, message self.close("Fatal error: %s\n%s" % (message or "", traceback.format_exc())) - + def reader(self): try: while True: @@ -74,7 +89,7 @@ class Peer: self.close(e) break ch = self.channel(frame.channel) - ch.dispatch(frame, self.work) + ch.receive(frame, self.work) except: self.fatal() @@ -99,35 +114,70 @@ class Peer: def worker(self): try: while True: - self.dispatch(self.work.get()) + queue = self.work.get() + frame = queue.get() + channel = self.channel(frame.channel) + if frame.method_type.content: + content = read_content(queue) + else: + content = None + + self.delegate(channel, Message(channel, frame, content)) except: self.fatal() - def dispatch(self, queue): - frame = queue.get() - channel = self.channel(frame.channel) - payload = frame.payload - if payload.method.content: - content = read_content(queue) +class Requester: + + def __init__(self, writer): + self.write = writer + self.sequence = Sequence(1) + self.mark = 0 + # request_id -> listener + self.outstanding = {} + + def request(self, method, listener, content = None): + frame = Request(self.sequence.next(), self.mark, method) + self.outstanding[frame.id] = listener + self.write(frame, content) + + def receive(self, channel, frame): + listener = self.outstanding.pop(frame.id) + listener(channel, frame) + +class Responder: + + def __init__(self, writer): + self.write = writer + self.sequence = Sequence(1) + + def respond(self, method, request): + if isinstance(request, Method): + self.write(method) else: - content = None - # Let the caller deal with exceptions thrown here. - message = Message(payload.method, payload.args, content) - self.delegate.dispatch(channel, message) + # XXX: batching + frame = Response(self.sequence.next(), request.id, 0, method) + self.write(frame) class Closed(Exception): pass class Channel: - def __init__(self, id, outgoing): + def __init__(self, id, outgoing, spec): self.id = id self.outgoing = outgoing + self.spec = spec self.incoming = Queue(0) self.responses = Queue(0) self.queue = None self.closed = False self.reason = None + self.requester = Requester(self.write) + self.responder = Responder(self.write) + + # XXX: better switch + self.reliable = False + def close(self, reason): if self.closed: return @@ -136,43 +186,87 @@ class Channel: self.incoming.close() self.responses.close() - def dispatch(self, frame, work): - payload = frame.payload - if isinstance(payload, Method): - if payload.method.response: + def write(self, frame, content = None): + if self.closed: + raise Closed(self.reason) + frame.channel = self.id + self.outgoing.put(frame) + if (isinstance(frame, (Method, Request)) + and content == None + and frame.method_type.content): + content = Content() + if content != None: + self.write_content(frame.method_type.klass, content) + + def write_content(self, klass, content): + size = content.size() + header = Header(klass, content.weight(), size, content.properties) + self.write(header) + for child in content.children: + self.write_content(klass, child) + # should split up if content.body exceeds max frame size + if size > 0: + self.write(Body(content.body)) + + def receive(self, frame, work): + if isinstance(frame, Method): + if frame.method.response: self.queue = self.responses else: self.queue = self.incoming work.put(self.incoming) + elif isinstance(frame, Request): + self.queue = self.incoming + work.put(self.incoming) + elif isinstance(frame, Response): + self.requester.receive(self, frame) + return self.queue.put(frame) - def invoke(self, method, args, content = None): - if self.closed: - raise Closed(self.reason) - frame = Frame(self.id, Method(method, *args)) - self.outgoing.put(frame) + def queue_response(self, channel, frame): + channel.responses.put(frame.method) - if method.content: - if content == None: - content = Content() - self.write_content(method.klass, content, self.outgoing) + def request(self, method, listener, content = None): + self.requester.request(method, listener, content) + + def respond(self, method, request): + self.responder.respond(method, request) + + def invoke(self, type, args, kwargs): + content = kwargs.pop("content", None) + frame = Method(type, type.arguments(*args, **kwargs)) + if self.reliable: + self.request(frame, self.queue_response, content) + try: + resp = self.responses.get() + return Message(self, resp) + except QueueClosed, e: + if self.closed: + raise Closed(self.reason) + else: + raise e + else: + return self.invoke_method(frame, content) + + def invoke_method(self, frame, content = None): + self.write(frame, content) try: # here we depend on all nowait fields being named nowait - f = method.fields.byname["nowait"] - nowait = args[method.fields.index(f)] + f = frame.method.fields.byname["nowait"] + nowait = frame.args[frame.method.fields.index(f)] except KeyError: nowait = False try: - if not nowait and method.responses: - resp = self.responses.get().payload + if not nowait and frame.method.responses: + resp = self.responses.get() if resp.method.content: content = read_content(self.responses) else: content = None - if resp.method in method.responses: - return Message(resp.method, resp.args, content) + if resp.method in frame.method.responses: + return Message(self, resp, content) else: raise ValueError(resp) except QueueClosed, e: @@ -181,19 +275,15 @@ class Channel: else: raise e - def write_content(self, klass, content, queue): - size = content.size() - header = Frame(self.id, Header(klass, content.weight(), size, **content.properties)) - queue.put(header) - for child in content.children: - self.write_content(klass, child, queue) - # should split up if content.body exceeds max frame size - if size > 0: - queue.put(Frame(self.id, Body(content.body))) + def __getattr__(self, name): + type = self.spec.method(name) + if type == None: raise AttributeError(name) + method = lambda *args, **kwargs: self.invoke(type, args, kwargs) + self.__dict__[name] = method + return method def read_content(queue): - frame = queue.get() - header = frame.payload + header = queue.get() children = [] for i in range(header.weight): children.append(read_content(queue)) @@ -202,7 +292,7 @@ def read_content(queue): buf = StringIO() while read < size: body = queue.get() - content = body.payload.content + content = body.content buf.write(content) read += len(content) return Content(buf.getvalue(), children, header.properties.copy()) diff --git a/python/qpid/spec.py b/python/qpid/spec.py index 0e3a477066..3e8b9e37bd 100644 --- a/python/qpid/spec.py +++ b/python/qpid/spec.py @@ -38,21 +38,16 @@ class SpecContainer: self.byname = {} self.byid = {} self.indexes = {} - self.bypyname = {} def add(self, item): if self.byname.has_key(item.name): raise ValueError("duplicate name: %s" % item) if self.byid.has_key(item.id): raise ValueError("duplicate id: %s" % item) - pyname = pythonize(item.name) - if self.bypyname.has_key(pyname): - raise ValueError("duplicate pyname: %s" % item) self.indexes[item] = len(self.items) self.items.append(item) self.byname[item.name] = item self.byid[item.id] = item - self.bypyname[pyname] = item def index(self, item): try: @@ -91,11 +86,23 @@ class Spec(Metadata): self.file = file self.constants = SpecContainer() self.classes = SpecContainer() + # methods indexed by classname_methname + self.methods = {} def post_load(self): self.module = self.define_module("amqp%s%s" % (self.major, self.minor)) self.klass = self.define_class("Amqp%s%s" % (self.major, self.minor)) + def method(self, name): + if not self.methods.has_key(name): + for cls in self.classes: + clen = len(cls.name) + if name.startswith(cls.name) and name[clen] == "_": + end = name[clen + 1:] + if cls.methods.byname.has_key(end): + self.methods[name] = cls.methods.byname[end] + return self.methods.get(name) + def parse_method(self, name): parts = re.split(r"\s*\.\s*", name) if len(parts) != 2: @@ -107,17 +114,16 @@ class Spec(Metadata): module = new.module(name, doc) module.__file__ = self.file for c in self.classes: - classname = pythonize(c.name) - cls = c.define_class(classname) + cls = c.define_class(c.name) cls.__module__ = module.__name__ - setattr(module, classname, cls) + setattr(module, c.name, cls) return module def define_class(self, name): methods = {} for c in self.classes: for m in c.methods: - meth = pythonize(m.klass.name + "_" + m.name) + meth = m.klass.name + "_" + m.name methods[meth] = m.define_method(meth) return type(name, (), methods) @@ -150,8 +156,7 @@ class Class(Metadata): def define_class(self, name): methods = {} for m in self.methods: - meth = pythonize(m.name) - methods[meth] = m.define_method(meth) + methods[m.name] = m.define_method(m.name) return type(name, (), methods) class Method(Metadata): @@ -172,11 +177,35 @@ class Method(Metadata): self.docs = docs self.response = False + def arguments(self, *args, **kwargs): + nargs = len(args) + len(kwargs) + maxargs = len(self.fields) + if nargs > maxargs: + self._type_error("takes at most %s arguments (%s) given", maxargs, nargs) + result = [] + for f in self.fields: + idx = self.fields.index(f) + if idx < len(args): + result.append(args[idx]) + elif kwargs.has_key(f.name): + result.append(kwargs.pop(f.name)) + else: + result.append(Method.DEFAULTS[f.type]) + for key, value in kwargs.items(): + if self.fields.byname.has_key(key): + self._type_error("got multiple values for keyword argument '%s'", key) + else: + self._type_error("got an unexpected keyword argument '%s'", key) + return tuple(result) + + def _type_error(self, msg, *args): + raise TypeError("%s %s" % (self.name, msg % args)) + def docstring(self): s = "\n\n".join([fill(d, 2) for d in [self.description] + self.docs]) for f in self.fields: if f.docs: - s += "\n\n" + "\n\n".join([fill(f.docs[0], 4, pythonize(f.name))] + + s += "\n\n" + "\n\n".join([fill(f.docs[0], 4, f.name)] + [fill(d, 4) for d in f.docs[1:]]) return s @@ -195,16 +224,13 @@ class Method(Metadata): def define_method(self, name): g = {Method.METHOD: self} l = {} - args = [(pythonize(f.name), Method.DEFAULTS[f.type]) for f in self.fields] + args = [(f.name, Method.DEFAULTS[f.type]) for f in self.fields] + methargs = args[:] if self.content: args += [("content", None)] code = "def %s(self, %s):\n" % \ (name, ", ".join(["%s = %r" % a for a in args])) code += " %r\n" % self.docstring() - if self.content: - methargs = args[:-1] - else: - methargs = args argnames = ", ".join([a[0] for a in methargs]) code += " return self.invoke(%s" % Method.METHOD if argnames: @@ -239,7 +265,7 @@ def load_fields(nd, l, domains): type = f_nd["@type"] while domains.has_key(type) and domains[type] != type: type = domains[type] - l.add(Field(f_nd["@name"], f_nd.index(), type, get_docs(f_nd))) + l.add(Field(pythonize(f_nd["@name"]), f_nd.index(), type, get_docs(f_nd))) def load(specfile): doc = xmlutil.parse(specfile) @@ -248,8 +274,8 @@ def load(specfile): # constants for nd in root["constant"]: - const = Constant(spec, nd["@name"], int(nd["@value"]), nd.get("@class"), - get_docs(nd)) + const = Constant(spec, pythonize(nd["@name"]), int(nd["@value"]), + nd.get("@class"), get_docs(nd)) spec.constants.add(const) # domains are typedefs @@ -259,14 +285,14 @@ def load(specfile): # classes for c_nd in root["class"]: - klass = Class(spec, c_nd["@name"], int(c_nd["@index"]), c_nd["@handler"], - get_docs(c_nd)) + klass = Class(spec, pythonize(c_nd["@name"]), int(c_nd["@index"]), + c_nd["@handler"], get_docs(c_nd)) load_fields(c_nd, klass.fields, domains) for m_nd in c_nd["method"]: - meth = Method(klass, m_nd["@name"], + meth = Method(klass, pythonize(m_nd["@name"]), int(m_nd["@index"]), m_nd.get_bool("@content", False), - [nd["@name"] for nd in m_nd["response"]], + [pythonize(nd["@name"]) for nd in m_nd["response"]], m_nd.get_bool("@synchronous", False), m_nd.text, get_docs(m_nd)) diff --git a/python/server b/python/server new file mode 100755 index 0000000000..4204bc8515 --- /dev/null +++ b/python/server @@ -0,0 +1,30 @@ +#!/usr/bin/env python +from qpid import spec +from qpid.connection import Connection, listen +from qpid.delegate import Delegate +from qpid.peer import Peer + +class Server(Delegate): + + def connection_open(self, ch, msg): + msg.open_ok() + + def channel_open(self, ch, msg): + print "channel %s open" % ch.id + msg.open_ok() + + def message_transfer(self, ch, msg): + print msg.body + msg.ok() + + +spec = spec.load("../specs/amqp.0-9.xml") + +for io in listen("0.0.0.0", 5672): + c = Connection(io, spec) + p = Peer(c, Server()) + c.tini() + p.start() + ch = p.channel(0) + ch.connection_start() + ch.connection_tune() diff --git a/python/tests/example.py b/python/tests/example.py index bc84f002e0..a1949ccb9f 100644 --- a/python/tests/example.py +++ b/python/tests/example.py @@ -58,7 +58,7 @@ class ExampleTest (TestBase): # Here we use ordinal arguments. self.exchange_declare(channel, 0, "test", "direct") - + # Here we use keyword arguments. self.queue_declare(channel, queue="test-queue") channel.queue_bind(queue="test-queue", exchange="test", routing_key="key") @@ -85,7 +85,7 @@ class ExampleTest (TestBase): # argument in case the server hangs. By default queue.get() will wait # until a message arrives or the connection to the server dies. msg = queue.get(timeout=10) - + # And check that we got the right response with assertEqual self.assertEqual(body, msg.content.body) |