diff options
Diffstat (limited to 'python/qpid/spec.py')
-rw-r--r-- | python/qpid/spec.py | 349 |
1 files changed, 349 insertions, 0 deletions
diff --git a/python/qpid/spec.py b/python/qpid/spec.py new file mode 100644 index 0000000000..70e09aa1e9 --- /dev/null +++ b/python/qpid/spec.py @@ -0,0 +1,349 @@ +# +# Copyright (c) 2006 The Apache Software Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +This module loads protocol metadata into python objects. It provides +access to spec metadata via a python object model, and can also +dynamically creating python methods, classes, and modules based on the +spec metadata. All the generated methods have proper signatures and +doc strings based on the spec metadata so the python help system can +be used to browse the spec documentation. The generated methods all +dispatch to the self.invoke(meth, args) callback of the containing +class so that the generated code can be reused in a variety of +situations. +""" + +import re, textwrap, new, xmlutil + +class SpecContainer: + + def __init__(self): + self.items = [] + self.byname = {} + self.byid = {} + self.indexes = {} + self.bypyname = {} + + def add(self, item): + if self.byname.has_key(item.name): + raise ValueError("duplicate name: %s" % item) + if self.byid.has_key(item.id): + raise ValueError("duplicate id: %s" % item) + pyname = pythonize(item.name) + if self.bypyname.has_key(pyname): + raise ValueError("duplicate pyname: %s" % item) + self.indexes[item] = len(self.items) + self.items.append(item) + self.byname[item.name] = item + self.byid[item.id] = item + self.bypyname[pyname] = item + + def index(self, item): + try: + return self.indexes[item] + except KeyError: + raise ValueError(item) + + def __iter__(self): + return iter(self.items) + + def __len__(self): + return len(self.items) + +class Metadata: + + PRINT = [] + + def __init__(self): + pass + + def __str__(self): + args = map(lambda f: "%s=%s" % (f, getattr(self, f)), self.PRINT) + return "%s(%s)" % (self.__class__.__name__, ", ".join(args)) + + def __repr__(self): + return str(self) + +class Spec(Metadata): + + PRINT=["major", "minor", "file"] + + def __init__(self, major, minor, file): + Metadata.__init__(self) + self.major = major + self.minor = minor + self.file = file + self.constants = SpecContainer() + self.classes = SpecContainer() + + def post_load(self): + self.module = self.define_module("amqp%s%s" % (self.major, self.minor)) + self.klass = self.define_class("Amqp%s%s" % (self.major, self.minor)) + + def parse_method(self, name): + parts = re.split(r"\s*\.\s*", name) + if len(parts) != 2: + raise ValueError(name) + klass, meth = parts + return self.classes.byname[klass].methods.byname[meth] + + def define_module(self, name, doc = None): + module = new.module(name, doc) + module.__file__ = self.file + for c in self.classes: + classname = pythonize(c.name) + cls = c.define_class(classname) + cls.__module__ = module.__name__ + setattr(module, classname, cls) + return module + + def define_class(self, name): + methods = {} + for c in self.classes: + for m in c.methods: + meth = pythonize(m.klass.name + "_" + m.name) + methods[meth] = m.define_method(meth) + return type(name, (), methods) + +class Constant(Metadata): + + PRINT=["name", "id"] + + def __init__(self, spec, name, id, klass, docs): + Metadata.__init__(self) + self.spec = spec + self.name = name + self.id = id + self.klass = klass + self.docs = docs + +class Class(Metadata): + + PRINT=["name", "id"] + + def __init__(self, spec, name, id, handler, docs): + Metadata.__init__(self) + self.spec = spec + self.name = name + self.id = id + self.handler = handler + self.fields = SpecContainer() + self.methods = SpecContainer() + self.docs = docs + + def define_class(self, name): + methods = {} + for m in self.methods: + meth = pythonize(m.name) + methods[meth] = m.define_method(meth) + return type(name, (), methods) + +class Method(Metadata): + + PRINT=["name", "id"] + + def __init__(self, klass, name, id, content, responses, synchronous, + description, docs): + Metadata.__init__(self) + self.klass = klass + self.name = name + self.id = id + self.content = content + self.responses = responses + self.synchronous = synchronous + self.fields = SpecContainer() + self.description = description + self.docs = docs + self.response = False + + def docstring(self): + s = "\n\n".join([fill(d, 2) for d in [self.description] + self.docs]) + for f in self.fields: + if f.docs: + s += "\n\n" + "\n\n".join([fill(f.docs[0], 4, pythonize(f.name))] + + [fill(d, 4) for d in f.docs[1:]]) + return s + + METHOD = "__method__" + DEFAULTS = {"bit": False, + "shortstr": "", + "longstr": "", + "table": {}, + "octet": 0, + "short": 0, + "long": 0, + "longlong": 0} + + def define_method(self, name): + g = {Method.METHOD: self} + l = {} + args = [(pythonize(f.name), Method.DEFAULTS[f.type]) for f in self.fields] + if self.content: + args += [("content", None)] + code = "def %s(self, %s):\n" % \ + (name, ", ".join(["%s = %r" % a for a in args])) + code += " %r\n" % self.docstring() + if self.content: + methargs = args[:-1] + else: + methargs = args + argnames = ", ".join([a[0] for a in methargs]) + code += " return self.invoke(%s" % Method.METHOD + if argnames: + code += ", (%s,)" % argnames + if self.content: + code += ", content" + code += ")" + exec code in g, l + return l[name] + +class Field(Metadata): + + PRINT=["name", "id", "type"] + + def __init__(self, name, id, type, docs): + Metadata.__init__(self) + self.name = name + self.id = id + self.type = type + self.docs = docs + +def get_docs(nd): + return [n.text for n in nd["doc"]] + +def load_fields(nd, l, domains): + for f_nd in nd["field"]: + try: + type = f_nd["@type"] + except KeyError: + type = domains[f_nd["@domain"]] + l.add(Field(f_nd["@name"], f_nd.index(), type, get_docs(f_nd))) + +def load(specfile): + doc = xmlutil.parse(specfile) + root = doc["amqp"][0] + spec = Spec(int(root["@major"]), int(root["@minor"]), specfile) + + # constants + for nd in root["constant"]: + const = Constant(spec, nd["@name"], int(nd["@value"]), nd.get("@class"), + get_docs(nd)) + spec.constants.add(const) + + # domains are typedefs + domains = {} + for nd in root["domain"]: + domains[nd["@name"]] = nd["@type"] + + # classes + for c_nd in root["class"]: + klass = Class(spec, c_nd["@name"], int(c_nd["@index"]), c_nd["@handler"], + get_docs(c_nd)) + load_fields(c_nd, klass.fields, domains) + for m_nd in c_nd["method"]: + meth = Method(klass, m_nd["@name"], + int(m_nd["@index"]), + m_nd.get_bool("@content", False), + [nd["@name"] for nd in m_nd["response"]], + m_nd.get_bool("@synchronous", False), + m_nd.text, + get_docs(m_nd)) + load_fields(m_nd, meth.fields, domains) + klass.methods.add(meth) + # resolve the responses + for m in klass.methods: + m.responses = [klass.methods.byname[r] for r in m.responses] + for resp in m.responses: + resp.response = True + spec.classes.add(klass) + spec.post_load() + return spec + +REPLACE = {" ": "_", "-": "_"} +KEYWORDS = {"global": "global_", + "return": "return_"} + +def pythonize(name): + name = str(name) + for key, val in REPLACE.items(): + name = name.replace(key, val) + try: + name = KEYWORDS[name] + except KeyError: + pass + return name + +def fill(text, indent, heading = None): + sub = indent * " " + if heading: + init = (indent - 2) * " " + heading + " -- " + else: + init = sub + w = textwrap.TextWrapper(initial_indent = init, subsequent_indent = sub) + return w.fill(" ".join(text.split())) + +class Rule(Metadata): + + PRINT = ["text", "implement", "tests"] + + def __init__(self, text, implement, tests, path): + self.text = text + self.implement = implement + self.tests = tests + self.path = path + +def find_rules(node, rules): + if node.name == "rule": + rules.append(Rule(node.text, node.get("@implement"), + [ch.text for ch in node if ch.name == "test"], + node.path())) + if node.name == "doc" and node.get("@name") == "rule": + tests = [] + if node.has("@test"): + tests.append(node["@test"]) + rules.append(Rule(node.text, None, tests, node.path())) + for child in node: + find_rules(child, rules) + +def load_rules(specfile): + rules = [] + find_rules(xmlutil.parse(specfile), rules) + return rules + +def test_summary(): + template = """ + <html><head><title>AMQP Tests</title></head> + <body> + <table width="80%%" align="center"> + %s + </table> + </body> + </html> + """ + rows = [] + for rule in load_rules("amqp.org/specs/amqp7.xml"): + if rule.tests: + tests = ", ".join(rule.tests) + else: + tests = " " + rows.append('<tr bgcolor="#EEEEEE"><td><b>Path:</b> %s</td>' + '<td><b>Implement:</b> %s</td>' + '<td><b>Tests:</b> %s</td></tr>' % + (rule.path[len("/root/amqp"):], rule.implement, tests)) + rows.append('<tr><td colspan="3">%s</td></tr>' % rule.text) + rows.append('<tr><td colspan="3"> </td></tr>') + + print template % "\n".join(rows) |