summaryrefslogtreecommitdiff
path: root/trunk/qpid/python/qpid/spec08.py
diff options
context:
space:
mode:
Diffstat (limited to 'trunk/qpid/python/qpid/spec08.py')
-rw-r--r--trunk/qpid/python/qpid/spec08.py504
1 files changed, 0 insertions, 504 deletions
diff --git a/trunk/qpid/python/qpid/spec08.py b/trunk/qpid/python/qpid/spec08.py
deleted file mode 100644
index a0047e7107..0000000000
--- a/trunk/qpid/python/qpid/spec08.py
+++ /dev/null
@@ -1,504 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-"""
-This module loads protocol metadata into python objects. It provides
-access to spec metadata via a python object model, and can also
-dynamically creating python methods, classes, and modules based on the
-spec metadata. All the generated methods have proper signatures and
-doc strings based on the spec metadata so the python help system can
-be used to browse the spec documentation. The generated methods all
-dispatch to the self.invoke(meth, args) callback of the containing
-class so that the generated code can be reused in a variety of
-situations.
-"""
-
-import re, new, mllib, qpid
-from util import fill
-
-class SpecContainer:
-
- def __init__(self):
- self.items = []
- self.byname = {}
- self.byid = {}
- self.indexes = {}
-
- def add(self, item):
- if self.byname.has_key(item.name):
- raise ValueError("duplicate name: %s" % item)
- if item.id == None:
- item.id = len(self)
- elif self.byid.has_key(item.id):
- raise ValueError("duplicate id: %s" % item)
- self.indexes[item] = len(self.items)
- self.items.append(item)
- self.byname[item.name] = item
- self.byid[item.id] = item
-
- def index(self, item):
- try:
- return self.indexes[item]
- except KeyError:
- raise ValueError(item)
-
- def __iter__(self):
- return iter(self.items)
-
- def __len__(self):
- return len(self.items)
-
-class Metadata:
-
- PRINT = []
-
- def __init__(self):
- pass
-
- def __str__(self):
- args = map(lambda f: "%s=%s" % (f, getattr(self, f)), self.PRINT)
- return "%s(%s)" % (self.__class__.__name__, ", ".join(args))
-
- def __repr__(self):
- return str(self)
-
-class Spec(Metadata):
-
- PRINT=["major", "minor", "file"]
-
- def __init__(self, major, minor, file):
- Metadata.__init__(self)
- self.major = major
- self.minor = minor
- self.file = file
- self.constants = SpecContainer()
- self.domains = SpecContainer()
- self.classes = SpecContainer()
- # methods indexed by classname_methname
- self.methods = {}
- # structs by type code
- self.structs = {}
-
- def post_load(self):
- self.module = self.define_module("amqp%s%s" % (self.major, self.minor))
- self.klass = self.define_class("Amqp%s%s" % (self.major, self.minor))
-
- def method(self, name):
- if not self.methods.has_key(name):
- for cls in self.classes:
- clen = len(cls.name)
- if name.startswith(cls.name) and name[clen] == "_":
- end = name[clen + 1:]
- if cls.methods.byname.has_key(end):
- self.methods[name] = cls.methods.byname[end]
- return self.methods.get(name)
-
- def parse_method(self, name):
- parts = re.split(r"\s*\.\s*", name)
- if len(parts) != 2:
- raise ValueError(name)
- klass, meth = parts
- return self.classes.byname[klass].methods.byname[meth]
-
- def struct(self, name, *args, **kwargs):
- type = self.domains.byname[name].type
- return qpid.Struct(type, *args, **kwargs)
-
- def define_module(self, name, doc = None):
- module = new.module(name, doc)
- module.__file__ = self.file
- for c in self.classes:
- cls = c.define_class(c.name)
- cls.__module__ = module.__name__
- setattr(module, c.name, cls)
- return module
-
- def define_class(self, name):
- methods = {}
- for c in self.classes:
- for m in c.methods:
- meth = m.klass.name + "_" + m.name
- methods[meth] = m.define_method(meth)
- return type(name, (), methods)
-
-class Constant(Metadata):
-
- PRINT=["name", "id"]
-
- def __init__(self, spec, name, id, klass, docs):
- Metadata.__init__(self)
- self.spec = spec
- self.name = name
- self.id = id
- self.klass = klass
- self.docs = docs
-
-class Domain(Metadata):
-
- PRINT=["name", "type"]
-
- def __init__(self, spec, name, type, description, docs):
- Metadata.__init__(self)
- self.spec = spec
- self.id = None
- self.name = name
- self.type = type
- self.description = description
- self.docs = docs
-
-class Struct(Metadata):
-
- PRINT=["size", "type", "pack"]
-
- def __init__(self, size, type, pack):
- Metadata.__init__(self)
- self.size = size
- self.type = type
- self.pack = pack
- self.fields = SpecContainer()
-
-class Class(Metadata):
-
- PRINT=["name", "id"]
-
- def __init__(self, spec, name, id, handler, docs):
- Metadata.__init__(self)
- self.spec = spec
- self.name = name
- self.id = id
- self.handler = handler
- self.fields = SpecContainer()
- self.methods = SpecContainer()
- self.docs = docs
-
- def define_class(self, name):
- methods = {}
- for m in self.methods:
- methods[m.name] = m.define_method(m.name)
- return type(name, (), methods)
-
-class Method(Metadata):
-
- PRINT=["name", "id"]
-
- def __init__(self, klass, name, id, content, responses, result, synchronous,
- description, docs):
- Metadata.__init__(self)
- self.klass = klass
- self.name = name
- self.id = id
- self.content = content
- self.responses = responses
- self.result = result
- self.synchronous = synchronous
- self.fields = SpecContainer()
- self.description = description
- self.docs = docs
- self.response = False
-
- def is_l4_command(self):
- return self.klass.name not in ["execution", "channel", "connection", "session"]
-
- def arguments(self, *args, **kwargs):
- nargs = len(args) + len(kwargs)
- maxargs = len(self.fields)
- if nargs > maxargs:
- self._type_error("takes at most %s arguments (%s) given", maxargs, nargs)
- result = []
- for f in self.fields:
- idx = self.fields.index(f)
- if idx < len(args):
- result.append(args[idx])
- elif kwargs.has_key(f.name):
- result.append(kwargs.pop(f.name))
- else:
- result.append(Method.DEFAULTS[f.type])
- for key, value in kwargs.items():
- if self.fields.byname.has_key(key):
- self._type_error("got multiple values for keyword argument '%s'", key)
- else:
- self._type_error("got an unexpected keyword argument '%s'", key)
- return tuple(result)
-
- def _type_error(self, msg, *args):
- raise TypeError("%s %s" % (self.name, msg % args))
-
- def docstring(self):
- s = "\n\n".join([fill(d, 2) for d in [self.description] + self.docs])
- for f in self.fields:
- if f.docs:
- s += "\n\n" + "\n\n".join([fill(f.docs[0], 4, f.name)] +
- [fill(d, 4) for d in f.docs[1:]])
- if self.responses:
- s += "\n\nValid responses: "
- for r in self.responses:
- s += r.name + " "
- return s
-
- METHOD = "__method__"
- DEFAULTS = {"bit": False,
- "shortstr": "",
- "longstr": "",
- "table": {},
- "array": [],
- "octet": 0,
- "short": 0,
- "long": 0,
- "longlong": 0,
- "timestamp": 0,
- "content": None,
- "uuid": "",
- "rfc1982_long": 0,
- "rfc1982_long_set": [],
- "long_struct": None}
-
- def define_method(self, name):
- g = {Method.METHOD: self}
- l = {}
- args = [(f.name, Method.DEFAULTS[f.type]) for f in self.fields]
- methargs = args[:]
- if self.content:
- args += [("content", None)]
- code = "def %s(self, %s):\n" % \
- (name, ", ".join(["%s = %r" % a for a in args]))
- code += " %r\n" % self.docstring()
- argnames = ", ".join([a[0] for a in methargs])
- code += " return self.invoke(%s" % Method.METHOD
- if argnames:
- code += ", (%s,)" % argnames
- else:
- code += ", ()"
- if self.content:
- code += ", content"
- code += ")"
- exec code in g, l
- return l[name]
-
-class Field(Metadata):
-
- PRINT=["name", "id", "type"]
-
- def __init__(self, name, id, type, domain, description, docs):
- Metadata.__init__(self)
- self.name = name
- self.id = id
- self.type = type
- self.domain = domain
- self.description = description
- self.docs = docs
-
- def default(self):
- if isinstance(self.type, Struct):
- return None
- else:
- return Method.DEFAULTS[self.type]
-
-WIDTHS = {
- "octet": 1,
- "short": 2,
- "long": 4
- }
-
-def width(st, default=None):
- if st in (None, "none", ""):
- return default
- else:
- return WIDTHS[st]
-
-def get_result(nd, spec):
- result = nd["result"]
- if not result: return None
- name = result["@domain"]
- if name != None: return spec.domains.byname[name]
- st_nd = result["struct"]
- st = Struct(width(st_nd["@size"]), int(result.parent.parent["@index"])*256 +
- int(st_nd["@type"]), width(st_nd["@pack"], 2))
- spec.structs[st.type] = st
- load_fields(st_nd, st.fields, spec.domains.byname)
- return st
-
-def get_desc(nd):
- label = nd["@label"]
- if not label:
- label = nd.text()
- if label:
- label = label.strip()
- return label
-
-def get_docs(nd):
- return [n.text() for n in nd.query["doc"]]
-
-def load_fields(nd, l, domains):
- for f_nd in nd.query["field"]:
- type = f_nd["@domain"]
- if type == None:
- type = f_nd["@type"]
- type = pythonize(type)
- domain = None
- while domains.has_key(type) and domains[type].type != type:
- domain = domains[type]
- type = domain.type
- l.add(Field(pythonize(f_nd["@name"]), f_nd.index(), type, domain,
- get_desc(f_nd), get_docs(f_nd)))
-
-def load(specfile, *errata):
- doc = mllib.xml_parse(specfile)
- spec_root = doc["amqp"]
- spec = Spec(int(spec_root["@major"]), int(spec_root["@minor"]), specfile)
-
- for root in [spec_root] + map(lambda x: mllib.xml_parse(x)["amqp"], errata):
- # constants
- for nd in root.query["constant"]:
- val = nd["@value"]
- if val.startswith("0x"): val = int(val, 16)
- else: val = int(val)
- const = Constant(spec, pythonize(nd["@name"]), val, nd["@class"],
- get_docs(nd))
- try:
- spec.constants.add(const)
- except ValueError, e:
- pass
- #print "Warning:", e
-
- # domains are typedefs
- structs = []
- for nd in root.query["domain"]:
- type = nd["@type"]
- if type == None:
- st_nd = nd["struct"]
- code = st_nd["@type"]
- if code not in (None, "", "none"):
- code = int(code)
- type = Struct(width(st_nd["@size"]), code, width(st_nd["@pack"], 2))
- if type.type != None:
- spec.structs[type.type] = type
- structs.append((type, st_nd))
- else:
- type = pythonize(type)
- domain = Domain(spec, pythonize(nd["@name"]), type, get_desc(nd),
- get_docs(nd))
- spec.domains.add(domain)
-
- # structs
- for st, st_nd in structs:
- load_fields(st_nd, st.fields, spec.domains.byname)
-
- # classes
- for c_nd in root.query["class"]:
- cname = pythonize(c_nd["@name"])
- if spec.classes.byname.has_key(cname):
- klass = spec.classes.byname[cname]
- else:
- klass = Class(spec, cname, int(c_nd["@index"]), c_nd["@handler"],
- get_docs(c_nd))
- spec.classes.add(klass)
-
- added_methods = []
- load_fields(c_nd, klass.fields, spec.domains.byname)
- for m_nd in c_nd.query["method"]:
- mname = pythonize(m_nd["@name"])
- if klass.methods.byname.has_key(mname):
- meth = klass.methods.byname[mname]
- else:
- meth = Method(klass, mname,
- int(m_nd["@index"]),
- m_nd["@content"] == "1",
- [pythonize(nd["@name"]) for nd in m_nd.query["response"]],
- get_result(m_nd, spec),
- m_nd["@synchronous"] == "1",
- get_desc(m_nd),
- get_docs(m_nd))
- klass.methods.add(meth)
- added_methods.append(meth)
- load_fields(m_nd, meth.fields, spec.domains.byname)
- # resolve the responses
- for m in added_methods:
- m.responses = [klass.methods.byname[r] for r in m.responses]
- for resp in m.responses:
- resp.response = True
-
- spec.post_load()
- return spec
-
-REPLACE = {" ": "_", "-": "_"}
-KEYWORDS = {"global": "global_",
- "return": "return_"}
-
-def pythonize(name):
- name = str(name)
- for key, val in REPLACE.items():
- name = name.replace(key, val)
- try:
- name = KEYWORDS[name]
- except KeyError:
- pass
- return name
-
-class Rule(Metadata):
-
- PRINT = ["text", "implement", "tests"]
-
- def __init__(self, text, implement, tests, path):
- self.text = text
- self.implement = implement
- self.tests = tests
- self.path = path
-
-def find_rules(node, rules):
- if node.name == "rule":
- rules.append(Rule(node.text, node.get("@implement"),
- [ch.text for ch in node if ch.name == "test"],
- node.path()))
- if node.name == "doc" and node.get("@name") == "rule":
- tests = []
- if node.has("@test"):
- tests.append(node["@test"])
- rules.append(Rule(node.text, None, tests, node.path()))
- for child in node:
- find_rules(child, rules)
-
-def load_rules(specfile):
- rules = []
- find_rules(xmlutil.parse(specfile), rules)
- return rules
-
-def test_summary():
- template = """
- <html><head><title>AMQP Tests</title></head>
- <body>
- <table width="80%%" align="center">
- %s
- </table>
- </body>
- </html>
- """
- rows = []
- for rule in load_rules("amqp.org/specs/amqp7.xml"):
- if rule.tests:
- tests = ", ".join(rule.tests)
- else:
- tests = "&nbsp;"
- rows.append('<tr bgcolor="#EEEEEE"><td><b>Path:</b> %s</td>'
- '<td><b>Implement:</b> %s</td>'
- '<td><b>Tests:</b> %s</td></tr>' %
- (rule.path[len("/root/amqp"):], rule.implement, tests))
- rows.append('<tr><td colspan="3">%s</td></tr>' % rule.text)
- rows.append('<tr><td colspan="3">&nbsp;</td></tr>')
-
- print template % "\n".join(rows)