diff options
Diffstat (limited to 'qpid/python/qpid/spec08.py')
-rw-r--r-- | qpid/python/qpid/spec08.py | 504 |
1 files changed, 504 insertions, 0 deletions
diff --git a/qpid/python/qpid/spec08.py b/qpid/python/qpid/spec08.py new file mode 100644 index 0000000000..a0047e7107 --- /dev/null +++ b/qpid/python/qpid/spec08.py @@ -0,0 +1,504 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +""" +This module loads protocol metadata into python objects. It provides +access to spec metadata via a python object model, and can also +dynamically creating python methods, classes, and modules based on the +spec metadata. All the generated methods have proper signatures and +doc strings based on the spec metadata so the python help system can +be used to browse the spec documentation. The generated methods all +dispatch to the self.invoke(meth, args) callback of the containing +class so that the generated code can be reused in a variety of +situations. +""" + +import re, new, mllib, qpid +from util import fill + +class SpecContainer: + + def __init__(self): + self.items = [] + self.byname = {} + self.byid = {} + self.indexes = {} + + def add(self, item): + if self.byname.has_key(item.name): + raise ValueError("duplicate name: %s" % item) + if item.id == None: + item.id = len(self) + elif self.byid.has_key(item.id): + raise ValueError("duplicate id: %s" % item) + self.indexes[item] = len(self.items) + self.items.append(item) + self.byname[item.name] = item + self.byid[item.id] = item + + def index(self, item): + try: + return self.indexes[item] + except KeyError: + raise ValueError(item) + + def __iter__(self): + return iter(self.items) + + def __len__(self): + return len(self.items) + +class Metadata: + + PRINT = [] + + def __init__(self): + pass + + def __str__(self): + args = map(lambda f: "%s=%s" % (f, getattr(self, f)), self.PRINT) + return "%s(%s)" % (self.__class__.__name__, ", ".join(args)) + + def __repr__(self): + return str(self) + +class Spec(Metadata): + + PRINT=["major", "minor", "file"] + + def __init__(self, major, minor, file): + Metadata.__init__(self) + self.major = major + self.minor = minor + self.file = file + self.constants = SpecContainer() + self.domains = SpecContainer() + self.classes = SpecContainer() + # methods indexed by classname_methname + self.methods = {} + # structs by type code + self.structs = {} + + def post_load(self): + self.module = self.define_module("amqp%s%s" % (self.major, self.minor)) + self.klass = self.define_class("Amqp%s%s" % (self.major, self.minor)) + + def method(self, name): + if not self.methods.has_key(name): + for cls in self.classes: + clen = len(cls.name) + if name.startswith(cls.name) and name[clen] == "_": + end = name[clen + 1:] + if cls.methods.byname.has_key(end): + self.methods[name] = cls.methods.byname[end] + return self.methods.get(name) + + def parse_method(self, name): + parts = re.split(r"\s*\.\s*", name) + if len(parts) != 2: + raise ValueError(name) + klass, meth = parts + return self.classes.byname[klass].methods.byname[meth] + + def struct(self, name, *args, **kwargs): + type = self.domains.byname[name].type + return qpid.Struct(type, *args, **kwargs) + + def define_module(self, name, doc = None): + module = new.module(name, doc) + module.__file__ = self.file + for c in self.classes: + cls = c.define_class(c.name) + cls.__module__ = module.__name__ + setattr(module, c.name, cls) + return module + + def define_class(self, name): + methods = {} + for c in self.classes: + for m in c.methods: + meth = m.klass.name + "_" + m.name + methods[meth] = m.define_method(meth) + return type(name, (), methods) + +class Constant(Metadata): + + PRINT=["name", "id"] + + def __init__(self, spec, name, id, klass, docs): + Metadata.__init__(self) + self.spec = spec + self.name = name + self.id = id + self.klass = klass + self.docs = docs + +class Domain(Metadata): + + PRINT=["name", "type"] + + def __init__(self, spec, name, type, description, docs): + Metadata.__init__(self) + self.spec = spec + self.id = None + self.name = name + self.type = type + self.description = description + self.docs = docs + +class Struct(Metadata): + + PRINT=["size", "type", "pack"] + + def __init__(self, size, type, pack): + Metadata.__init__(self) + self.size = size + self.type = type + self.pack = pack + self.fields = SpecContainer() + +class Class(Metadata): + + PRINT=["name", "id"] + + def __init__(self, spec, name, id, handler, docs): + Metadata.__init__(self) + self.spec = spec + self.name = name + self.id = id + self.handler = handler + self.fields = SpecContainer() + self.methods = SpecContainer() + self.docs = docs + + def define_class(self, name): + methods = {} + for m in self.methods: + methods[m.name] = m.define_method(m.name) + return type(name, (), methods) + +class Method(Metadata): + + PRINT=["name", "id"] + + def __init__(self, klass, name, id, content, responses, result, synchronous, + description, docs): + Metadata.__init__(self) + self.klass = klass + self.name = name + self.id = id + self.content = content + self.responses = responses + self.result = result + self.synchronous = synchronous + self.fields = SpecContainer() + self.description = description + self.docs = docs + self.response = False + + def is_l4_command(self): + return self.klass.name not in ["execution", "channel", "connection", "session"] + + def arguments(self, *args, **kwargs): + nargs = len(args) + len(kwargs) + maxargs = len(self.fields) + if nargs > maxargs: + self._type_error("takes at most %s arguments (%s) given", maxargs, nargs) + result = [] + for f in self.fields: + idx = self.fields.index(f) + if idx < len(args): + result.append(args[idx]) + elif kwargs.has_key(f.name): + result.append(kwargs.pop(f.name)) + else: + result.append(Method.DEFAULTS[f.type]) + for key, value in kwargs.items(): + if self.fields.byname.has_key(key): + self._type_error("got multiple values for keyword argument '%s'", key) + else: + self._type_error("got an unexpected keyword argument '%s'", key) + return tuple(result) + + def _type_error(self, msg, *args): + raise TypeError("%s %s" % (self.name, msg % args)) + + def docstring(self): + s = "\n\n".join([fill(d, 2) for d in [self.description] + self.docs]) + for f in self.fields: + if f.docs: + s += "\n\n" + "\n\n".join([fill(f.docs[0], 4, f.name)] + + [fill(d, 4) for d in f.docs[1:]]) + if self.responses: + s += "\n\nValid responses: " + for r in self.responses: + s += r.name + " " + return s + + METHOD = "__method__" + DEFAULTS = {"bit": False, + "shortstr": "", + "longstr": "", + "table": {}, + "array": [], + "octet": 0, + "short": 0, + "long": 0, + "longlong": 0, + "timestamp": 0, + "content": None, + "uuid": "", + "rfc1982_long": 0, + "rfc1982_long_set": [], + "long_struct": None} + + def define_method(self, name): + g = {Method.METHOD: self} + l = {} + args = [(f.name, Method.DEFAULTS[f.type]) for f in self.fields] + methargs = args[:] + if self.content: + args += [("content", None)] + code = "def %s(self, %s):\n" % \ + (name, ", ".join(["%s = %r" % a for a in args])) + code += " %r\n" % self.docstring() + argnames = ", ".join([a[0] for a in methargs]) + code += " return self.invoke(%s" % Method.METHOD + if argnames: + code += ", (%s,)" % argnames + else: + code += ", ()" + if self.content: + code += ", content" + code += ")" + exec code in g, l + return l[name] + +class Field(Metadata): + + PRINT=["name", "id", "type"] + + def __init__(self, name, id, type, domain, description, docs): + Metadata.__init__(self) + self.name = name + self.id = id + self.type = type + self.domain = domain + self.description = description + self.docs = docs + + def default(self): + if isinstance(self.type, Struct): + return None + else: + return Method.DEFAULTS[self.type] + +WIDTHS = { + "octet": 1, + "short": 2, + "long": 4 + } + +def width(st, default=None): + if st in (None, "none", ""): + return default + else: + return WIDTHS[st] + +def get_result(nd, spec): + result = nd["result"] + if not result: return None + name = result["@domain"] + if name != None: return spec.domains.byname[name] + st_nd = result["struct"] + st = Struct(width(st_nd["@size"]), int(result.parent.parent["@index"])*256 + + int(st_nd["@type"]), width(st_nd["@pack"], 2)) + spec.structs[st.type] = st + load_fields(st_nd, st.fields, spec.domains.byname) + return st + +def get_desc(nd): + label = nd["@label"] + if not label: + label = nd.text() + if label: + label = label.strip() + return label + +def get_docs(nd): + return [n.text() for n in nd.query["doc"]] + +def load_fields(nd, l, domains): + for f_nd in nd.query["field"]: + type = f_nd["@domain"] + if type == None: + type = f_nd["@type"] + type = pythonize(type) + domain = None + while domains.has_key(type) and domains[type].type != type: + domain = domains[type] + type = domain.type + l.add(Field(pythonize(f_nd["@name"]), f_nd.index(), type, domain, + get_desc(f_nd), get_docs(f_nd))) + +def load(specfile, *errata): + doc = mllib.xml_parse(specfile) + spec_root = doc["amqp"] + spec = Spec(int(spec_root["@major"]), int(spec_root["@minor"]), specfile) + + for root in [spec_root] + map(lambda x: mllib.xml_parse(x)["amqp"], errata): + # constants + for nd in root.query["constant"]: + val = nd["@value"] + if val.startswith("0x"): val = int(val, 16) + else: val = int(val) + const = Constant(spec, pythonize(nd["@name"]), val, nd["@class"], + get_docs(nd)) + try: + spec.constants.add(const) + except ValueError, e: + pass + #print "Warning:", e + + # domains are typedefs + structs = [] + for nd in root.query["domain"]: + type = nd["@type"] + if type == None: + st_nd = nd["struct"] + code = st_nd["@type"] + if code not in (None, "", "none"): + code = int(code) + type = Struct(width(st_nd["@size"]), code, width(st_nd["@pack"], 2)) + if type.type != None: + spec.structs[type.type] = type + structs.append((type, st_nd)) + else: + type = pythonize(type) + domain = Domain(spec, pythonize(nd["@name"]), type, get_desc(nd), + get_docs(nd)) + spec.domains.add(domain) + + # structs + for st, st_nd in structs: + load_fields(st_nd, st.fields, spec.domains.byname) + + # classes + for c_nd in root.query["class"]: + cname = pythonize(c_nd["@name"]) + if spec.classes.byname.has_key(cname): + klass = spec.classes.byname[cname] + else: + klass = Class(spec, cname, int(c_nd["@index"]), c_nd["@handler"], + get_docs(c_nd)) + spec.classes.add(klass) + + added_methods = [] + load_fields(c_nd, klass.fields, spec.domains.byname) + for m_nd in c_nd.query["method"]: + mname = pythonize(m_nd["@name"]) + if klass.methods.byname.has_key(mname): + meth = klass.methods.byname[mname] + else: + meth = Method(klass, mname, + int(m_nd["@index"]), + m_nd["@content"] == "1", + [pythonize(nd["@name"]) for nd in m_nd.query["response"]], + get_result(m_nd, spec), + m_nd["@synchronous"] == "1", + get_desc(m_nd), + get_docs(m_nd)) + klass.methods.add(meth) + added_methods.append(meth) + load_fields(m_nd, meth.fields, spec.domains.byname) + # resolve the responses + for m in added_methods: + m.responses = [klass.methods.byname[r] for r in m.responses] + for resp in m.responses: + resp.response = True + + spec.post_load() + return spec + +REPLACE = {" ": "_", "-": "_"} +KEYWORDS = {"global": "global_", + "return": "return_"} + +def pythonize(name): + name = str(name) + for key, val in REPLACE.items(): + name = name.replace(key, val) + try: + name = KEYWORDS[name] + except KeyError: + pass + return name + +class Rule(Metadata): + + PRINT = ["text", "implement", "tests"] + + def __init__(self, text, implement, tests, path): + self.text = text + self.implement = implement + self.tests = tests + self.path = path + +def find_rules(node, rules): + if node.name == "rule": + rules.append(Rule(node.text, node.get("@implement"), + [ch.text for ch in node if ch.name == "test"], + node.path())) + if node.name == "doc" and node.get("@name") == "rule": + tests = [] + if node.has("@test"): + tests.append(node["@test"]) + rules.append(Rule(node.text, None, tests, node.path())) + for child in node: + find_rules(child, rules) + +def load_rules(specfile): + rules = [] + find_rules(xmlutil.parse(specfile), rules) + return rules + +def test_summary(): + template = """ + <html><head><title>AMQP Tests</title></head> + <body> + <table width="80%%" align="center"> + %s + </table> + </body> + </html> + """ + rows = [] + for rule in load_rules("amqp.org/specs/amqp7.xml"): + if rule.tests: + tests = ", ".join(rule.tests) + else: + tests = " " + rows.append('<tr bgcolor="#EEEEEE"><td><b>Path:</b> %s</td>' + '<td><b>Implement:</b> %s</td>' + '<td><b>Tests:</b> %s</td></tr>' % + (rule.path[len("/root/amqp"):], rule.implement, tests)) + rows.append('<tr><td colspan="3">%s</td></tr>' % rule.text) + rows.append('<tr><td colspan="3"> </td></tr>') + + print template % "\n".join(rows) |