diff options
Diffstat (limited to 'java/common')
42 files changed, 1493 insertions, 1044 deletions
diff --git a/java/common/Composite.tpl b/java/common/Composite.tpl new file mode 100644 index 0000000000..4172ffdefc --- /dev/null +++ b/java/common/Composite.tpl @@ -0,0 +1,173 @@ +package org.apache.qpidity.transport; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import org.apache.qpidity.transport.codec.Decoder; +import org.apache.qpidity.transport.codec.Encodable; +import org.apache.qpidity.transport.codec.Encoder; + +import org.apache.qpidity.transport.network.Frame; + +${ +from genutil import * + +cls = klass(type)["@name"] + +if type.name in ("control", "command"): + base = "Method" + size = 0 + pack = 2 + if type["segments"]: + payload = "true" + else: + payload = "false" + if type.name == "control" and cls == "connection": + track = "Frame.L1" + elif cls == "session" and type["@name"] in ("attach", "attached", "detach", "detached"): + track = "Frame.L2" + elif type.name == "command": + track = "Frame.L4" + else: + track = "Frame.L3" +else: + base = "Struct" + size = type["@size"] + pack = type["@pack"] + payload = "false" + track = "-1" + +typecode = code(type) +} + +public class $name extends $base { + + public static final int TYPE = $typecode; + + public final int getStructType() { + return TYPE; + } + + public final int getSizeWidth() { + return $size; + } + + public final int getPackWidth() { + return $pack; + } + + public final boolean hasPayload() { + return $payload; + } + + public final byte getEncodedTrack() { + return $track; + } + + private static final List<Field<?,?>> FIELDS = new ArrayList<Field<?,?>>(); + public List<Field<?,?>> getFields() { return FIELDS; } + +${ +fields = get_fields(type) +params = get_parameters(fields) +options = get_options(fields) + +for f in fields: + out(" private boolean has_$(f.name);\n") + out(" private $(f.type) $(f.name);\n") +} + +${ +if fields: + out(" public $name() {}\n") +} + + public $name($(", ".join(params))) { +${ +for f in fields: + if f.option: continue + out(" $(f.set)($(f.name));\n") + +for f in options: + out(" boolean _$(f.name) = false;\n") + +if options: + out(""" + for (int i=0; i < _options.length; i++) { + switch (_options[i]) { +""") + + for f in options: + out(" case $(f.option): _$(f.name) = true; break;\n") + + out(""" case NO_OPTION: break; + default: throw new IllegalArgumentException("invalid option: " + _options[i]); + } + } +""") + +for f in options: + out(" $(f.set)(_$(f.name));\n") +} + } + + public <C> void dispatch(C context, MethodDelegate<C> delegate) { + delegate.$(dromedary(name))(context, this); + } + +${ +for f in fields: + out(""" + public final boolean $(f.has)() { + return has_$(f.name); + } + + public final $name $(f.clear)() { + this.has_$(f.name) = false; + this.$(f.name) = $(f.default); + this.dirty = true; + return this; + } + + public final $(f.type) $(f.get)() { + return $(f.name); + } + + public final $name $(f.set)($(f.type) value) { + this.$(f.name) = value; + this.has_$(f.name) = true; + this.dirty = true; + return this; + } + + public final $name $(f.name)($(f.type) value) { + this.$(f.name) = value; + this.has_$(f.name) = true; + this.dirty = true; + return this; + } + + static { + FIELDS.add(new Field<$name,$(jref(jclass(f.type)))>($name.class, $(jref(jclass(f.type))).class, "$(f.name)", $(f.index)) { + public boolean has(Object struct) { + return check(struct).has_$(f.name); + } + public void has(Object struct, boolean value) { + check(struct).has_$(f.name) = value; + } + public $(jref(f.type)) get(Object struct) { + return check(struct).$(f.get)(); + } + public void read(Decoder dec, Object struct) { + check(struct).$(f.name) = $(f.read); + check(struct).dirty = true; + } + public void write(Encoder enc, Object struct) { + $(f.write); + } + }); + } +""") +}} diff --git a/java/common/Constant.tpl b/java/common/Constant.tpl new file mode 100644 index 0000000000..695812ea75 --- /dev/null +++ b/java/common/Constant.tpl @@ -0,0 +1,14 @@ +package org.apache.qpidity.transport; + +${from genutil import *} + +public interface Constant +{ +${ +constants = spec.query["amqp/constant"] + +for c in constants: + name = scream(c["@name"]) + value = c["@value"] + out(" public static final int $name = $value;\n") +}} diff --git a/java/common/Enum.tpl b/java/common/Enum.tpl new file mode 100644 index 0000000000..337feb7065 --- /dev/null +++ b/java/common/Enum.tpl @@ -0,0 +1,36 @@ +package org.apache.qpidity.transport; + +public enum $name { +${ +from genutil import * + +vtype = jtype(resolve_type(type)) + +choices = [(scream(ch["@name"]), "(%s) %s" % (vtype, ch["@value"])) + for ch in type.query["enum/choice"]] +} + $(",\n ".join(["%s(%s)" % ch for ch in choices])); + + private final $vtype value; + + $name($vtype value) + { + this.value = value; + } + + public $vtype getValue() + { + return value; + } + + public static $name get($vtype value) + { + switch (value) + { +${ +for ch, value in choices: + out(' case $value: return $ch;\n') +} default: throw new IllegalArgumentException("no such value: " + value); + } + } +} diff --git a/java/common/Invoker.tpl b/java/common/Invoker.tpl new file mode 100644 index 0000000000..d9905c71a0 --- /dev/null +++ b/java/common/Invoker.tpl @@ -0,0 +1,41 @@ +package org.apache.qpidity.transport; + +import java.util.List; +import java.util.Map; +import java.util.UUID; + +public abstract class Invoker { + + protected abstract void invoke(Method method); + protected abstract <T> Future<T> invoke(Method method, Class<T> resultClass); + +${ +from genutil import * + +for c in composites: + name = cname(c) + fields = get_fields(c) + params = get_parameters(fields) + args = get_arguments(fields) + result = c["result"] + if result: + if not result["@type"]: + rname = cname(result["struct"]) + else: + rname = cname(result, "@type") + jresult = "Future<%s>" % rname + jreturn = "return " + jclass = ", %s.class" % rname + else: + jresult = "void" + jreturn = "" + jclass = "" + + out(""" + public $jresult $(dromedary(name))($(", ".join(params))) { + $(jreturn)invoke(new $name($(", ".join(args)))$jclass); + } +""") +} + +} diff --git a/java/common/MethodDelegate.tpl b/java/common/MethodDelegate.tpl new file mode 100644 index 0000000000..e5ab1ae1e7 --- /dev/null +++ b/java/common/MethodDelegate.tpl @@ -0,0 +1,12 @@ +package org.apache.qpidity.transport; + +public abstract class MethodDelegate<C> { + +${ +from genutil import * + +for c in composites: + name = cname(c) + out(" public void $(dromedary(name))(C context, $name struct) {}\n") +} +} diff --git a/java/common/Option.tpl b/java/common/Option.tpl new file mode 100644 index 0000000000..5fa2b95b9f --- /dev/null +++ b/java/common/Option.tpl @@ -0,0 +1,19 @@ +package org.apache.qpidity.transport; + +public enum Option { + +${ +from genutil import * + +options = {} + +for c in composites: + for f in c.query["field"]: + t = resolve_type(f) + if t["@name"] == "bit": + option = scream(f["@name"]) + if not options.has_key(option): + options[option] = None + out(" $option,\n")} + NO_OPTION +} diff --git a/java/common/StructFactory.tpl b/java/common/StructFactory.tpl new file mode 100644 index 0000000000..b27621b1d2 --- /dev/null +++ b/java/common/StructFactory.tpl @@ -0,0 +1,39 @@ +package org.apache.qpidity.transport; + +class StructFactory { + + public static Struct create(int type) + { + switch (type) + { +${ +from genutil import * + +fragment = """ case $name.TYPE: + return new $name(); +""" + +for c in composites: + name = cname(c) + if c.name == "struct": + out(fragment) +} default: + throw new IllegalArgumentException("type: " + type); + } + } + + public static Struct createInstruction(int type) + { + switch (type) + { +${ +for c in composites: + name = cname(c) + if c.name in ("command", "control"): + out(fragment) +} default: + throw new IllegalArgumentException("type: " + type); + } + } + +} diff --git a/java/common/Type.tpl b/java/common/Type.tpl new file mode 100644 index 0000000000..c869934538 --- /dev/null +++ b/java/common/Type.tpl @@ -0,0 +1,63 @@ +package org.apache.qpidity.transport; + +${from genutil import *} + +public enum Type +{ + +${ +types = spec.query["amqp/type"] + spec.query["amqp/class/type"] +codes = {} +first = True +for t in types: + code = t["@code"] + fix_width = t["@fixed-width"] + var_width = t["@variable-width"] + + if code is None: + continue + + if fix_width is None: + width = var_width + fixed = "false" + else: + width = fix_width + fixed = "true" + + name = scream(t["@name"]) + codes[code] = name + + if first: + first = False + else: + out(",\n") + + out(" $name((byte) $code, $width, $fixed)") +}; + + public byte code; + public int width; + public boolean fixed; + + Type(byte code, int width, boolean fixed) + { + this.code = code; + this.width = width; + this.fixed = fixed; + } + + public static Type get(byte code) + { + switch (code) + { +${ +keys = list(codes.keys()) +keys.sort() + +for code in keys: + out(" case (byte) $code: return $(codes[code]);\n") +} + default: return null; + } + } +} diff --git a/java/common/build.xml b/java/common/build.xml index 6c8bfbaf0f..796385eff3 100644 --- a/java/common/build.xml +++ b/java/common/build.xml @@ -34,7 +34,7 @@ <target name="check_jython_deps"> <uptodate property="jython.notRequired" targetfile="${jython.timestamp}"> - <srcfiles dir="${xml.spec.dir}" includes="amqp.0-10-preview.xml" /> + <srcfiles dir="${xml.spec.dir}" includes="amqp.0-10-qpid-errata.xml" /> </uptodate> </target> @@ -42,10 +42,9 @@ <java classname="org.python.util.jython" fork="true" failonerror="true"> <arg value="-Dpython.cachedir.skip=true"/> <arg value="-Dpython.path=${basedir}/jython-lib.jar/Lib${path.separator}${mllib.dir}"/> - <arg value="${basedir}/generate"/> + <arg value="${basedir}/codegen"/> <arg value="${module.precompiled}"/> - <arg value="org.apache.qpidity.transport"/> - <arg value="${xml.spec.dir}/amqp.0-10-preview.xml"/> + <arg value="${xml.spec.dir}/amqp.0-10-qpid-errata.xml"/> <classpath> <pathelement location="jython-2.2-rc2.jar"/> </classpath> diff --git a/java/common/codegen b/java/common/codegen new file mode 100755 index 0000000000..f5d1577774 --- /dev/null +++ b/java/common/codegen @@ -0,0 +1,63 @@ +#!/usr/bin/env python + +import os, sys, mllib +from templating import Parser +from genutil import * + +out_dir = sys.argv[1] +spec_file = sys.argv[2] +pkg_dir = os.path.join(out_dir, "org/apache/qpidity/transport") + +if not os.path.exists(pkg_dir): + os.makedirs(pkg_dir) + +spec = mllib.xml_parse(spec_file) + +def excludes(nd): + if (nd.parent is not None and + nd.parent.name == "class" and + nd.parent["@name"] in ("file", "stream")): + return False + else: + return True + +def execute(output, template, **kwargs): + f = open(template) + input = f.read() + f.close() + p = Parser(**kwargs) + p.parse(input) + fname = os.path.join(pkg_dir, output) + f = open(fname, "w") + f.write(p.output) + f.close() + +execute("Type.java", "Type.tpl", spec = spec) +execute("Constant.java", "Constant.tpl", spec = spec) + +structs = spec.query["amqp/struct"] + \ + spec.query["amqp/class/struct", excludes] + \ + spec.query["amqp/class/command/result/struct", excludes] +controls = spec.query["amqp/class/control", excludes] +commands = spec.query["amqp/class/command", excludes] + +composites = structs + controls + commands + +for c in composites: + name = cname(c) + execute("%s.java" % name, "Composite.tpl", type = c, name = name) + +execute("MethodDelegate.java", "MethodDelegate.tpl", composites = composites) +execute("Option.java", "Option.tpl", composites = composites) +execute("Invoker.java", "Invoker.tpl", composites = controls + commands) +execute("StructFactory.java", "StructFactory.tpl", composites = composites) + +def is_enum(nd): + return nd["enum"] is not None + +enums = spec.query["amqp/domain", is_enum] + \ + spec.query["amqp/class/domain", is_enum] + +for e in enums: + name = cname(e) + execute("%s.java" % name, "Enum.tpl", name = name, type = e) diff --git a/java/common/generate b/java/common/generate deleted file mode 100755 index daf8475c54..0000000000 --- a/java/common/generate +++ /dev/null @@ -1,567 +0,0 @@ -#!/usr/bin/python - -# Interim code generation script. - -import sys, os, mllib -from cStringIO import StringIO - -out_dir=sys.argv[1] -out_pkg = sys.argv[2] -spec_file = sys.argv[3] - -spec = mllib.xml_parse(spec_file) - -def jbool(b): - if b: - return "true" - else: - return "false" - -class Output: - - def __init__(self, dir, package, name): - self.dir = dir - self.package = package - self.name = name - self.lines = [] - - self.line("package %s;" % self.package) - self.line() - self.line("import java.util.ArrayList;") - self.line("import java.util.List;") - self.line("import java.util.Map;") - self.line("import java.util.UUID;") - self.line() - self.line("import org.apache.qpidity.transport.codec.Decoder;") - self.line("import org.apache.qpidity.transport.codec.Encodable;") - self.line("import org.apache.qpidity.transport.codec.Encoder;") - self.line() - self.line("import org.apache.qpidity.transport.network.Frame;") - self.line() - self.line() - - def line(self, l = ""): - self.lines.append(l) - - def getter(self, type, method, value, pre = None): - self.line() - self.line(" public final %s %s() {" % (type, method)) - if pre: - self.line(" %s;" % pre) - self.line(" return %s;" % value) - self.line(" }") - - def setter(self, type, method, variable, value = None, pre = None, - post = None): - if value: - params = "" - else: - params = "%s value" % type - value = "value" - - self.line() - self.line(" public final %s %s(%s) {" % (self.name, method, params)) - if pre: - self.line(" %s;" % pre) - self.line(" this.%s = %s;" % (variable, value)) - if post: - self.line(" %s;" % post) - self.line(" return this;") - self.line(" }") - - def write(self): - dir = os.path.join(self.dir, *self.package.split(".")) - if not os.path.exists(dir): - os.makedirs(dir) - file = os.path.join(dir, "%s.java" % self.name) - out = open(file, "w") - for l in self.lines: - out.write(l) - out.write(os.linesep) - out.close() - -TYPES = { - "longstr": "String", - "shortstr": "String", - "longlong": "long", - "long": "long", - "short": "int", - "octet": "short", - "bit": "boolean", - "table": "Map<String,Object>", - "timestamp": "long", - "content": "String", - "uuid": "UUID", - "rfc1982-long-set": "RangeSet", - "long-struct": "Struct", - "signed-byte": "byte", - "unsigned-byte": "short", - "char": "char", - "boolean": "boolean", - "two-octets": "short", - "signed-short": "short", - "unsigned-short": "int", - "four-octets": "int", - "signed-int": "int", - "unsigned-int": "long", - "float": "float", - "utf32-char": "char", - "eight-octets": "long", - "signed-long": "long", - "unsigned-long": "long", - "double": "double", - "datetime": "long", - "sixteen-octets": "byte[]", - "thirty-two-octets": "byte[]", - "sixty-four-octets": "byte[]", - "_128-octets": "byte[]", - "short-binary": "byte[]", - "short-string": "String", - "short-utf8-string": "String", - "short-utf16-string": "String", - "short-utf32-string": "String", - "binary": "byte[]", - "string": "String", - "utf8-string": "String", - "utf16-string": "String", - "utf32-string": "String", - "long-binary": "byte[]", - "long-string": "String", - "long-utf8-string": "String", - "long-utf16-string": "String", - "long-utf32-string": "String", - "sequence": "List<Object>", - "array": "List<Object>", - "five-octets": "byte[]", - "decimal": "byte[]", - "nine-octets": "byte[]", - "long-decimal": "byte[]", - "void": "Void" - } - -DEFAULTS = { - "longlong": "0", - "long": "0", - "short": "0", - "octet": "0", - "timestamp": "0", - "bit": "false" - } - -TRACKS = { - "connection": "Frame.L1", - "session": "Frame.L2", - "execution": "Frame.L3", - None: None - } - -def camel(offset, *args): - parts = [] - for a in args: - parts.extend(a.split("-")) - return "".join(parts[:offset] + [p[0].upper() + p[1:] for p in parts[offset:]]) - -def dromedary(s): - return s[0].lower() + s[1:] - -def scream(*args): - return "_".join([a.replace("-", "_").upper() for a in args]) - - -types = Output(out_dir, out_pkg, "Type") -types.line("public enum Type") -types.line("{") -codes = {} -for c in spec.query["amqp/constant"]: - if c["@class"] == "field-table-type": - name = c["@name"] - if name.startswith("field-table-"): - name = name[12:] - if name[0].isdigit(): - name = "_" + name - val = c["@value"] - codes[val] = name - if c["@width"] != None: - width = c["@width"] - fixed = "true" - if c["@lfwidth"] != None: - width = c["@lfwidth"] - fixed = "false" - types.line(" %s((byte) %s, %s, %s)," % - (scream(name), val, width, fixed)) -types.line(" ;") - -types.line(" public byte code;") -types.line(" public int width;") -types.line(" public boolean fixed;") - -types.line(" Type(byte code, int width, boolean fixed)") -types.line(" {") -for arg in ("code", "width", "fixed"): - types.line(" this.%s = %s;" % (arg, arg)) -types.line(" }") - -types.line(" public static Type get(byte code)") -types.line(" {") -types.line(" switch (code)") -types.line(" {") -for code, name in codes.items(): - types.line(" case (byte) %s: return %s;" % (code, scream(name))) -types.line(" default: return null;") -types.line(" }") -types.line(" }") - -types.line("}") -types.write() - - -const = Output(out_dir, out_pkg, "Constant") -const.line("public interface Constant") -const.line("{") -for d in spec.query["amqp/constant"]: - name = d["@name"] - val = d["@value"] - datatype = d["@datatype"] - if datatype == None: - const.line("public static final int %s = %s;" % (scream(name), val)) -const.line("}") -const.write() - - -DOMAINS = {} -STRUCTS = {} - -for d in spec.query["amqp/domain"]: - name = d["@name"] - type = d["@type"] - if type != None: - DOMAINS[name] = d["@type"] - elif d["struct"] != None: - DOMAINS[name] = name - STRUCTS[name] = camel(0, name) - -def resolve(type): - if DOMAINS.has_key(type) and DOMAINS[type] != type: - return resolve(DOMAINS[type]) - else: - return type - -def jtype(type): - if STRUCTS.has_key(type): - return STRUCTS[type] - else: - return TYPES[type] - -def jclass(jt): - idx = jt.find('<') - if idx > 0: - return jt[:idx] - else: - return jt - -REFS = { - "boolean": "Boolean", - "byte": "Byte", - "short": "Short", - "int": "Integer", - "long": "Long", - "float": "Float", - "double": "Double", - "char": "Character" -} - -def jref(jt): - return REFS.get(jt, jt) - - -OPTIONS = {} - -class Struct: - - def __init__(self, node, name, base, type, size, pack, track, content): - self.node = node - self.name = name - self.base = base - self.type = type - self.size = size - self.pack = pack - self.track = track - self.content = content - self.fields = [] - self.ticket = False - - def result(self): - r = self.node["result"] - if not r: return - name = r["@domain"] - if not name: - name = self.name + "Result" - else: - name = camel(0, name) - return name - - def field(self, type, name): - if name == "ticket": - self.ticket = True - else: - self.fields.append((type, name)) - - def impl(self, out): - out.line("public class %s extends %s {" % (self.name, self.base)) - - out.line() - out.line(" public static final int TYPE = %d;" % self.type) - out.getter("int", "getStructType", "TYPE") - out.getter("int", "getSizeWidth", self.size) - out.getter("int", "getPackWidth", self.pack) - out.getter("boolean", "hasTicket", jbool(self.ticket)) - - if self.base == "Method": - out.getter("boolean", "hasPayload", jbool(self.content)) - out.getter("byte", "getEncodedTrack", self.track) - - out.line() - out.line(" private static final List<Field<?,?>> FIELDS = new ArrayList<Field<?,?>>();") - out.line(" public List<Field<?,?>> getFields() { return FIELDS; }") - out.line() - - out.line() - for type, name in self.fields: - out.line(" private boolean has_%s;" % name) - out.line(" private %s %s;" % (jtype(type), name)) - - if self.fields: - out.line() - out.line(" public %s() {}" % self.name) - - out.line() - out.line(" public %s(%s) {" % (self.name, self.parameters())) - opts = False - for type, name in self.fields: - if not OPTIONS.has_key(name): - out.line(" %s(%s);" % (camel(1, "set", name), name)) - else: - opts = True - if opts: - for type, name in self.fields: - if OPTIONS.has_key(name): - out.line(" boolean _%s = false;" % name) - out.line(" for (int i=0; i < _options.length; i++) {") - out.line(" switch (_options[i]) {") - for type, name in self.fields: - if OPTIONS.has_key(name): - out.line(" case %s: _%s=true; break;" % (OPTIONS[name], name)) - out.line(" case NO_OPTION: break;") - out.line(' default: throw new IllegalArgumentException' - '("invalid option: " + _options[i]);') - out.line(" }") - out.line(" }") - for type, name in self.fields: - if OPTIONS.has_key(name): - out.line(" %s(_%s);" % (camel(1, "set", name), name)) - out.line(" }") - - out.line() - out.line(" public <C> void dispatch(C context, MethodDelegate<C> delegate) {") - out.line(" delegate.%s(context, this);" % dromedary(self.name)) - out.line(" }") - - index = 0 - for type, name in self.fields: - out.getter("boolean", camel(1, "has", name), "has_" + name) - out.setter("boolean", camel(1, "clear", name), "has_" + name, "false", - post = "this.%s = %s; this.dirty = true" % (name, DEFAULTS.get(type, "null"))) - out.getter(jtype(type), camel(1, "get", name), name) - for mname in (camel(1, "set", name), name): - out.setter(jtype(type), mname, name, - post = "this.has_%s = true; this.dirty = true" % name) - - out.line() - out.line(' static {') - ftype = jref(jclass(jtype(type))) - out.line(' FIELDS.add(new Field<%s,%s>(%s.class, %s.class, "%s", %d) {' % - (self.name, ftype, self.name, ftype, name, index)) - out.line(' public boolean has(Object struct) {') - out.line(' return check(struct).has_%s;' % name) - out.line(' }') - out.line(' public void has(Object struct, boolean value) {') - out.line(' check(struct).has_%s = value;' % name) - out.line(' }') - out.line(' public %s get(Object struct) {' % ftype) - out.line(' return check(struct).%s();' % camel(1, "get", name)) - out.line(' }') - out.line(' public void read(Decoder dec, Object struct) {') - if TYPES.has_key(type): - out.line(' check(struct).%s = dec.read%s();' % (name, camel(0, type))) - elif STRUCTS.has_key(type): - out.line(' check(struct).%s = (%s) dec.readStruct(%s.TYPE);' % - (name, STRUCTS[type], STRUCTS[type])) - else: - raise Exception("unknown type: %s" % type) - out.line(' check(struct).dirty = true;') - out.line(' }') - out.line(' public void write(Encoder enc, Object struct) {') - if TYPES.has_key(type): - out.line(' enc.write%s(check(struct).%s);' % (camel(0, type), name)) - elif STRUCTS.has_key(type): - out.line(' enc.writeStruct(%s.TYPE, check(struct).%s);' % - (STRUCTS[type], name)) - else: - raise Exception("unknown type: %s" % type) - out.line(' }') - out.line(' });') - out.line(' }') - index += 1; - - out.line("}") - - - def parameters(self): - params = [] - var = False - for type, name in self.fields: - if OPTIONS.has_key(name): - var = True - else: - params.append("%s %s" % (jtype(type), name)) - if var: - params.append("Option ... _options") - return ", ".join(params) - - def arguments(self): - args = [] - var = False - for type, name in self.fields: - if OPTIONS.has_key(name): - var = True - else: - args.append(name) - if var: - args.append("_options") - return ", ".join(args) - -CLASSES = {"file": False, "basic": False, "stream": False, "tunnel": False} - -PACK_WIDTHS = { - None: 2, - "octet": 1, - "short": 2, - "long": 4 - } - -SIZE_WIDTHS = PACK_WIDTHS.copy() -SIZE_WIDTHS[None] = 0 - -class Visitor(mllib.transforms.Visitor): - - def __init__(self): - self.structs = [] - self.untyped = -1 - - def do_method(self, m): - if CLASSES.get(m.parent["@name"], True): - name = camel(0, m.parent["@name"], m["@name"]) - type = int(m.parent["@index"])*256 + int(m["@index"]) - self.structs.append((name, "Method", type, 0, 2, m)) - self.descend(m) - - def do_domain(self, d): - s = d["struct"] - if s: - name = camel(0, d["@name"]) - st = s["@type"] - if st in (None, "none", ""): - type = self.untyped - self.untyped -= 1 - else: - type = int(st) - self.structs.append((name, "Struct", type, SIZE_WIDTHS[s["@size"]], - PACK_WIDTHS[s["@pack"]], s)) - self.descend(d) - - def do_result(self, r): - s = r["struct"] - if s: - name = camel(0, r.parent.parent["@name"], r.parent["@name"], "Result") - type = int(r.parent.parent["@index"]) * 256 + int(s["@type"]) - self.structs.append((name, "Result", type, SIZE_WIDTHS[s["@size"]], - PACK_WIDTHS[s["@pack"]], s)) - self.descend(r) - -v = Visitor() -spec.dispatch(v) - -opts = Output(out_dir, out_pkg, "Option") -opts.line("public enum Option {") -structs = [] -for name, base, typecode, size, pack, m in v.structs: - struct = Struct(m, name, base, typecode, size, pack, - TRACKS.get(m.parent["@name"], "Frame.L4"), - m["@content"] == "1") - for f in m.query["field"]: - type = resolve(f["@domain"]) - name = camel(1, f["@name"]) - struct.field(type, name) - if type == "bit": - opt_name = scream(f["@name"]) - if not OPTIONS.has_key(name): - OPTIONS[name] = opt_name - opts.line(" %s," % opt_name) - structs.append(struct) -opts.line(" %s," % "NO_OPTION") -opts.line("}") -opts.write() - - - - -for s in structs: - impl = Output(out_dir, out_pkg, s.name) - s.impl(impl) - impl.write() - -fct = Output(out_dir, out_pkg, "StructFactory") -fct.line("class StructFactory {") -fct.line(" public static Struct create(int type) {") -fct.line(" switch (type) {") -for s in structs: - fct.line(" case %s.TYPE:" % s.name) - fct.line(" return new %s();" % s.name) -fct.line(" default:") -fct.line(' throw new IllegalArgumentException("type: " + type);') -fct.line(" }") -fct.line(" }") -fct.line("}"); -fct.write() - -dlg = Output(out_dir, out_pkg, "MethodDelegate") -dlg.line("public abstract class MethodDelegate<C> {") -for s in structs: - dlg.line(" public void %s(C context, %s struct) {}" % - (dromedary(s.name), s.name)) -dlg.line("}") -dlg.write() - -inv = Output(out_dir, out_pkg, "Invoker") -inv.line("public abstract class Invoker {") -inv.line() -inv.line(" protected abstract void invoke(Method method);") -inv.line(" protected abstract <T> Future<T> invoke(Method method, Class<T> resultClass);") -inv.line() -for s in structs: - if s.base != "Method": continue - dname = dromedary(s.name) - result = s.result() - if result: - result_type = "Future<%s>" % result - else: - result_type = "void" - inv.line(" public %s %s(%s) {" % (result_type, dname, s.parameters())) - if result: - inv.line(" return invoke(new %s(%s), %s.class);" % - (s.name, s.arguments(), result)) - else: - inv.line(" invoke(new %s(%s));" % (s.name, s.arguments())) - inv.line(" }") -inv.line("}") -inv.write() diff --git a/java/common/genutil.py b/java/common/genutil.py new file mode 100644 index 0000000000..5206b50bbd --- /dev/null +++ b/java/common/genutil.py @@ -0,0 +1,207 @@ + +def camel(offset, *args): + parts = [] + for a in args: + parts.extend(a.split("-")) + return "".join(parts[:offset] + [p[0].upper() + p[1:] for p in parts[offset:]]) + +def dromedary(s): + return s[0].lower() + s[1:] + +def scream(*args): + return "_".join([a.replace("-", "_").upper() for a in args]) + +def num(x): + if x is not None and x != "": + return int(x, 0) + else: + return None + +def klass(nd): + parent = nd.parent + while parent is not None: + if hasattr(parent, "name") and parent.name == "class": + return parent + parent = parent.parent + +untyped = -1 + +def code(nd): + global untyped + cd = num(nd["@code"]) + if cd is None: + cd = untyped + untyped -= 1 + return cd + + cls = klass(nd) + if cls: + cd |= (num(cls["@code"]) << 8) + return cd + +def root(nd): + if nd.parent is None: + return nd + else: + return root(nd.parent) + +def qname(nd): + name = nd["@name"] + cls = klass(nd) + if cls != None: + return "%s.%s" % (cls["@name"], name) + else: + return name + +def resolve(node, name): + spec = root(node) + cls = klass(node) + if cls: + for nd in cls.query["#tag"]: + if nd["@name"] == name: + return nd + for nd in spec.query["amqp/#tag"] + spec.query["amqp/class/#tag"]: + if name == qname(nd): + return nd + raise Exception("unresolved name: %s" % name) + +def resolve_type(nd): + name = nd["@type"] + type = resolve(nd, name) + if type.name == "domain" and not type["enum"]: + return resolve_type(type) + else: + return type + +TYPES = { + "bit": "boolean", + "uint8": "short", + "uint16": "int", + "uint32": "long", + "uint64": "long", + "datetime": "long", + "uuid": "UUID", + "sequence-no": "int", + "sequence-set": "RangeSet", # XXX + "byte-ranges": "RangeSet", # XXX + "str8": "String", + "str16": "String", + "vbin8": "byte[]", + "vbin16": "byte[]", + "vbin32": "byte[]", + "struct32": "Struct", + "map": "Map<String,Object>", + "array": "List<Object>" + } + +def cname(nd, field="@name"): + cls = klass(nd) + if cls: + if (nd.name in ("struct", "result") and + cls["@name"] != "session" and + nd[field] != "header"): + return camel(0, nd[field]) + else: + return camel(0, cls["@name"], nd[field]) + else: + return camel(0, nd[field]) + +def jtype(nd): + if nd.name == "struct" or nd["enum"]: + return cname(nd) + else: + return TYPES[nd["@name"]] + +REFS = { + "boolean": "Boolean", + "byte": "Byte", + "short": "Short", + "int": "Integer", + "long": "Long", + "float": "Float", + "double": "Double", + "char": "Character" +} + +def jref(jt): + return REFS.get(jt, jt) + +def jclass(jt): + idx = jt.find('<') + if idx > 0: + return jt[:idx] + else: + return jt + +DEFAULTS = { + "long": 0, + "int": 0, + "short": 0, + "byte": 0, + "char": 0, + "boolean": "false" + } + +class Field: + + def __init__(self, index, nd): + self.index = index + self.name = camel(1, nd["@name"]) + type_node = resolve_type(nd) + tname = cname(type_node) + if type_node.name == "struct": + self.read = "(%s) dec.readStruct(%s.TYPE)" % (tname, tname) + self.write = "enc.writeStruct(%s.TYPE, check(struct).%s)" % (tname, self.name) + elif type_node.name == "domain": + coder = camel(0, resolve_type(type_node)["@name"]) + self.read = "%s.get(dec.read%s())" % (tname, coder) + self.write = "enc.write%s(check(struct).%s.getValue())" % (coder, self.name) + else: + coder = camel(0, type_node["@name"]) + self.read = "dec.read%s()" % coder + self.write = "enc.write%s(check(struct).%s)" % (coder, self.name) + self.type = jtype(type_node) + self.default = DEFAULTS.get(self.type, "null") + self.has = camel(1, "has", self.name) + self.get = camel(1, "get", self.name) + self.set = camel(1, "set", self.name) + self.clear = camel(1, "clear", self.name) + if self.type == "boolean": + self.option = scream(nd["@name"]) + else: + self.option = None + +def get_fields(nd): + fields = [] + index = 0 + for f in nd.query["field"]: + fields.append(Field(index, f)) + index += 1 + return fields + +def get_parameters(fields): + params = [] + options = False + for f in fields: + if f.option: + options = True + else: + params.append("%s %s" % (f.type, f.name)) + if options: + params.append("Option ... _options") + return params + +def get_arguments(fields): + args = [] + options = False + for f in fields: + if f.option: + options = True + else: + args.append(f.name) + if options: + args.append("_options") + return args + +def get_options(fields): + return [f for f in fields if f.option] diff --git a/java/common/src/main/java/org/apache/qpidity/SecurityHelper.java b/java/common/src/main/java/org/apache/qpidity/SecurityHelper.java index d0b3272dec..a72997813a 100644 --- a/java/common/src/main/java/org/apache/qpidity/SecurityHelper.java +++ b/java/common/src/main/java/org/apache/qpidity/SecurityHelper.java @@ -22,6 +22,7 @@ package org.apache.qpidity; import java.io.UnsupportedEncodingException; import java.util.HashSet; +import java.util.List; import java.util.StringTokenizer; import org.apache.qpidity.security.AMQPCallbackHandler; @@ -29,13 +30,12 @@ import org.apache.qpidity.security.CallbackHandlerRegistry; public class SecurityHelper { - public static String chooseMechanism(String mechanisms) throws UnsupportedEncodingException + public static String chooseMechanism(List<Object> mechanisms) throws UnsupportedEncodingException { - StringTokenizer tokenizer = new StringTokenizer(mechanisms, " "); HashSet mechanismSet = new HashSet(); - while (tokenizer.hasMoreTokens()) + for (Object m : mechanisms) { - mechanismSet.add(tokenizer.nextToken()); + mechanismSet.add(m); } String preferredMechanisms = CallbackHandlerRegistry.getInstance().getMechanisms(); diff --git a/java/common/src/main/java/org/apache/qpidity/ToyBroker.java b/java/common/src/main/java/org/apache/qpidity/ToyBroker.java index 2bd97f3aff..5f9917e30a 100644 --- a/java/common/src/main/java/org/apache/qpidity/ToyBroker.java +++ b/java/common/src/main/java/org/apache/qpidity/ToyBroker.java @@ -59,7 +59,7 @@ class ToyBroker extends SessionDelegate public void messageAcquire(Session context, MessageAcquire struct) { System.out.println("\n==================> messageAcquire " ); - context.messageAcquired(struct.getTransfers()); + context.executionResult((int) struct.getId(), new Acquired(struct.getTransfers())); } @Override public void queueDeclare(Session ssn, QueueDeclare qd) @@ -68,16 +68,16 @@ class ToyBroker extends SessionDelegate System.out.println("\n==================> declared queue: " + qd.getQueue() + "\n"); } - @Override public void queueBind(Session ssn, QueueBind qb) + @Override public void exchangeBind(Session ssn, ExchangeBind qb) { - exchange.bindQueue(qb.getExchange(), qb.getRoutingKey(),qb.getQueue()); - System.out.println("\n==================> bound queue: " + qb.getQueue() + " with routing key " + qb.getRoutingKey() + "\n"); + exchange.bindQueue(qb.getExchange(), qb.getBindingKey(),qb.getQueue()); + System.out.println("\n==================> bound queue: " + qb.getQueue() + " with binding key " + qb.getBindingKey() + "\n"); } @Override public void queueQuery(Session ssn, QueueQuery qq) { QueueQueryResult result = new QueueQueryResult().queue(qq.getQueue()); - ssn.executionResult(qq.getId(), result); + ssn.executionResult((int) qq.getId(), result); } @Override public void messageSubscribe(Session ssn, MessageSubscribe ms) @@ -112,7 +112,8 @@ class ToyBroker extends SessionDelegate { if (xfr == null || body == null) { - ssn.connectionClose(503, "no method segment", 0, 0); + ssn.connectionClose(ConnectionCloseCode.FRAMING_ERROR, + "no method segment"); ssn.close(); return; } @@ -136,7 +137,7 @@ class ToyBroker extends SessionDelegate { if (xfr == null || body == null) { - ssn.connectionClose(503, "no method segment", 0, 0); + ssn.connectionClose(ConnectionCloseCode.FRAMING_ERROR, "no method segment"); ssn.close(); return; } @@ -174,14 +175,16 @@ class ToyBroker extends SessionDelegate { RangeSet ranges = new RangeSet(); ranges.add(xfr.getId()); - ssn.messageReject(ranges, 0, "no such destination"); + ssn.messageReject(ranges, MessageRejectCode.UNROUTABLE, + "no such destination"); } } private void transferMessageToPeer(Session ssn,String dest, Message m) { System.out.println("\n==================> Transfering message to: " +dest + "\n"); - ssn.messageTransfer(dest, (short)0, (short)0); + ssn.messageTransfer(dest, MessageAcceptMode.EXPLICIT, + MessageAcquireMode.PRE_ACQUIRED); ssn.header(m.header); for (Data d : m.body) { diff --git a/java/common/src/main/java/org/apache/qpidity/ToyClient.java b/java/common/src/main/java/org/apache/qpidity/ToyClient.java index 10b68bbb20..a3233afcbe 100644 --- a/java/common/src/main/java/org/apache/qpidity/ToyClient.java +++ b/java/common/src/main/java/org/apache/qpidity/ToyClient.java @@ -63,7 +63,7 @@ class ToyClient extends SessionDelegate public static final void main(String[] args) { Connection conn = MinaHandler.connect("0.0.0.0", 5672, - new ConnectionDelegate() + new ClientDelegate() { public SessionDelegate getSessionDelegate() { @@ -80,9 +80,9 @@ class ToyClient extends SessionDelegate TransportConstants.getVersionMinor()))); Channel ch = conn.getChannel(0); - Session ssn = new Session(); + Session ssn = new Session("my-session".getBytes()); ssn.attach(ch); - ssn.sessionOpen(1234); + ssn.sessionAttach(ssn.getName()); ssn.queueDeclare("asdf", null, null); ssn.sync(); @@ -111,13 +111,15 @@ class ToyClient extends SessionDelegate map.put("list", Arrays.asList(1, 2, 3)); map.put("binary", new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}); - ssn.messageTransfer("asdf", (short) 0, (short) 1); + ssn.messageTransfer("asdf", MessageAcceptMode.EXPLICIT, + MessageAcquireMode.PRE_ACQUIRED); ssn.header(new DeliveryProperties(), new MessageProperties().setApplicationHeaders(map)); ssn.data("this is the data"); ssn.endData(); - ssn.messageTransfer("fdsa", (short) 0, (short) 1); + ssn.messageTransfer("fdsa", MessageAcceptMode.EXPLICIT, + MessageAcquireMode.PRE_ACQUIRED); ssn.data("this should be rejected"); ssn.endData(); ssn.sync(); diff --git a/java/common/src/main/java/org/apache/qpidity/dtx/XidImpl.java b/java/common/src/main/java/org/apache/qpidity/dtx/XidImpl.java index 224917ade1..89d7e7917f 100644 --- a/java/common/src/main/java/org/apache/qpidity/dtx/XidImpl.java +++ b/java/common/src/main/java/org/apache/qpidity/dtx/XidImpl.java @@ -241,28 +241,10 @@ public class XidImpl implements Xid * @return The String representation of this Xid * @throws QpidException In case of problem when converting this Xid into a string. */ - public static String convertToString(Xid xid) throws QpidException + public static org.apache.qpidity.transport.Xid convert(Xid xid) throws QpidException { - if (_logger.isDebugEnabled()) - { - _logger.debug("converting " + xid + " into a String"); - } - try - { - ByteArrayOutputStream res = new ByteArrayOutputStream(); - DataOutputStream out = new DataOutputStream(res); - out.writeLong(xid.getFormatId()); - byte[] txId = xid.getGlobalTransactionId(); - byte[] brId = xid.getBranchQualifier(); - out.writeByte(txId.length); - out.writeByte(brId.length); - out.write(txId); - out.write(brId); - return res.toString(); - } - catch (IOException e) - { - throw new QpidException("cannot convert the xid " + xid + " into a String", null, e); - } + return new org.apache.qpidity.transport.Xid(xid.getFormatId(), + xid.getGlobalTransactionId(), + xid.getBranchQualifier()); } } diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Channel.java b/java/common/src/main/java/org/apache/qpidity/transport/Channel.java index 7327697088..651402bcb7 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/Channel.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/Channel.java @@ -77,7 +77,7 @@ public class Channel extends Invoker connection.getConnectionDelegate().init(this, hdr); } - public void method(Void v, Method method) + public void control(Void v, Method method) { switch (method.getEncodedTrack()) { @@ -90,15 +90,17 @@ public class Channel extends Invoker case L3: method.delegate(session, sessionDelegate); break; - case L4: - method.delegate(session, sessionDelegate); - break; default: throw new IllegalStateException ("unknown track: " + method.getEncodedTrack()); } } + public void command(Void v, Method method) + { + method.delegate(session, sessionDelegate); + } + public void header(Void v, Header header) { header.delegate(session, sessionDelegate); diff --git a/java/common/src/main/java/org/apache/qpidity/transport/ChannelDelegate.java b/java/common/src/main/java/org/apache/qpidity/transport/ChannelDelegate.java index 84621fbe25..7f4365f515 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/ChannelDelegate.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/ChannelDelegate.java @@ -32,15 +32,14 @@ import java.util.UUID; class ChannelDelegate extends MethodDelegate<Channel> { - public @Override void sessionOpen(Channel channel, SessionOpen open) + public @Override void sessionAttach(Channel channel, SessionAttach atch) { - Session ssn = new Session(); + Session ssn = new Session(atch.getName()); ssn.attach(channel); - long lifetime = open.getDetachedLifetime(); - ssn.sessionAttached(UUID.randomUUID(), lifetime); + ssn.sessionAttached(ssn.getName()); } - public @Override void sessionClosed(Channel channel, SessionClosed closed) + public @Override void sessionDetached(Channel channel, SessionDetached closed) { channel.getSession().closed(); // XXX: should we remove the channel from the connection? It diff --git a/java/common/src/main/java/org/apache/qpidity/transport/ClientDelegate.java b/java/common/src/main/java/org/apache/qpidity/transport/ClientDelegate.java new file mode 100644 index 0000000000..699854fb3b --- /dev/null +++ b/java/common/src/main/java/org/apache/qpidity/transport/ClientDelegate.java @@ -0,0 +1,41 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity.transport; + + +/** + * ClientDelegate + * + */ + +public abstract class ClientDelegate extends ConnectionDelegate +{ + + public void init(Channel ch, ProtocolHeader hdr) + { + if (hdr.getMajor() != TransportConstants.getVersionMajor() && + hdr.getMinor() != TransportConstants.getVersionMinor()) + { + throw new RuntimeException("version missmatch: " + hdr); + } + } + +} diff --git a/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java b/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java index 4815f1025f..cb5f05a185 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java @@ -26,7 +26,10 @@ import org.apache.qpidity.SecurityHelper; import org.apache.qpidity.QpidException; import java.io.UnsupportedEncodingException; + +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; @@ -79,17 +82,27 @@ public abstract class ConnectionDelegate extends MethodDelegate<Channel> public void init(Channel ch, ProtocolHeader hdr) { - // XXX: hardcoded version - if (hdr.getMajor() != 0 && hdr.getMinor() != 10) + ch.getConnection().send(new ConnectionEvent(0, new ProtocolHeader + (1, + TransportConstants.getVersionMajor(), + TransportConstants.getVersionMinor()))); + if (hdr.getMajor() != TransportConstants.getVersionMajor() && + hdr.getMinor() != TransportConstants.getVersionMinor()) { // XXX - ch.getConnection().send(new ConnectionEvent(0, new ProtocolHeader(1, TransportConstants.getVersionMajor(), - TransportConstants.getVersionMinor()))); + ch.getConnection().send(new ConnectionEvent(0, new ProtocolHeader + (1, + TransportConstants.getVersionMajor(), + TransportConstants.getVersionMinor()))); ch.getConnection().close(); } else { - ch.connectionStart(hdr.getMajor(), hdr.getMinor(), null, "PLAIN", "utf8"); + List<Object> plain = new ArrayList<Object>(); + plain.add("PLAIN"); + List<Object> utf8 = new ArrayList<Object>(); + utf8.add("utf8"); + ch.connectionStart(null, plain, utf8); } } @@ -99,13 +112,13 @@ public abstract class ConnectionDelegate extends MethodDelegate<Channel> @Override public void connectionStart(Channel context, ConnectionStart struct) { String mechanism = null; - String response = null; + byte[] response = null; try { mechanism = SecurityHelper.chooseMechanism(struct.getMechanisms()); saslClient = Sasl.createSaslClient(new String[]{ mechanism },null, "AMQP", "localhost", null, SecurityHelper.createCallbackHandler(mechanism,_username,_password )); - response = new String(saslClient.evaluateChallenge(new byte[0]),_locale); + response = saslClient.evaluateChallenge(new byte[0]); } catch (UnsupportedEncodingException e) { @@ -128,13 +141,9 @@ public abstract class ConnectionDelegate extends MethodDelegate<Channel> { try { - String response = new String(saslClient.evaluateChallenge(struct.getChallenge().getBytes()),_locale); + byte[] response = saslClient.evaluateChallenge(struct.getChallenge()); context.connectionSecureOk(response); } - catch (UnsupportedEncodingException e) - { - // need error handling - } catch (SaslException e) { // need error handling @@ -144,14 +153,14 @@ public abstract class ConnectionDelegate extends MethodDelegate<Channel> @Override public void connectionTune(Channel context, ConnectionTune struct) { // should update the channel max given by the broker. - context.connectionTuneOk(struct.getChannelMax(), struct.getFrameMax(), struct.getHeartbeat()); + context.connectionTuneOk(struct.getChannelMax(), struct.getMaxFrameSize(), struct.getHeartbeatMax()); context.connectionOpen(_virtualHost, null, Option.INSIST); } @Override public void connectionOpenOk(Channel context, ConnectionOpenOk struct) { - String knownHosts = struct.getKnownHosts(); + List<Object> knownHosts = struct.getKnownHosts(); if(_negotiationCompleteLock != null) { _negotiationCompleteLock.lock(); @@ -187,13 +196,13 @@ public abstract class ConnectionDelegate extends MethodDelegate<Channel> byte[] challenge = null; if ( challenge == null) { - context.connectionTune(Integer.MAX_VALUE,maxFrame, 0); + context.connectionTune(Integer.MAX_VALUE, maxFrame, 0, Integer.MAX_VALUE); } else { try { - context.connectionSecure(new String(challenge,_locale)); + context.connectionSecure(challenge); } catch(Exception e) { @@ -218,16 +227,16 @@ public abstract class ConnectionDelegate extends MethodDelegate<Channel> try { saslServer = Sasl.createSaslServer(_mechanism, "AMQP", "ABC",new HashMap(),SecurityHelper.createCallbackHandler(_mechanism,_username,_password)); - byte[] challenge = saslServer.evaluateResponse(struct.getResponse().getBytes()); + byte[] challenge = saslServer.evaluateResponse(struct.getResponse()); if ( challenge == null) { - context.connectionTune(Integer.MAX_VALUE,maxFrame, 0); + context.connectionTune(Integer.MAX_VALUE, maxFrame, 0, Integer.MAX_VALUE); } else { try { - context.connectionSecure(new String(challenge,_locale)); + context.connectionSecure(challenge); } catch(Exception e) { @@ -250,8 +259,9 @@ public abstract class ConnectionDelegate extends MethodDelegate<Channel> @Override public void connectionOpen(Channel context, ConnectionOpen struct) { - String hosts = "amqp:1223243232325"; - context.connectionOpenOk(hosts); + List<Object> hosts = new ArrayList<Object>(); + hosts.add("amqp:1223243232325"); + context.connectionOpenOk(hosts); } public String getPassword() diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Data.java b/java/common/src/main/java/org/apache/qpidity/transport/Data.java index b9b8636d18..72c1c3331d 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/Data.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/Data.java @@ -93,7 +93,7 @@ public class Data implements ProtocolEvent { str.append(" | "); } - str.append(str(buf, 20)); + str.append(str(buf)); } str.append(")"); return str.toString(); diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Method.java b/java/common/src/main/java/org/apache/qpidity/transport/Method.java index 7304260333..bc4ec6289d 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/Method.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/Method.java @@ -20,6 +20,7 @@ */ package org.apache.qpidity.transport; +import org.apache.qpidity.transport.network.Frame; /** * Method @@ -34,11 +35,12 @@ public abstract class Method extends Struct implements ProtocolEvent { // XXX: should generate separate factories for separate // namespaces - return (Method) Struct.create(type); + return (Method) StructFactory.createInstruction(type); } // XXX: command subclass? private long id; + private boolean sync = false; public final long getId() { @@ -50,6 +52,16 @@ public abstract class Method extends Struct implements ProtocolEvent this.id = id; } + public final boolean isSync() + { + return sync; + } + + void setSync(boolean value) + { + this.sync = value; + } + public abstract boolean hasPayload(); public abstract byte getEncodedTrack(); @@ -58,7 +70,14 @@ public abstract class Method extends Struct implements ProtocolEvent public <C> void delegate(C context, ProtocolDelegate<C> delegate) { - delegate.method(context, this); + if (getEncodedTrack() == Frame.L4) + { + delegate.command(context, this); + } + else + { + delegate.control(context, this); + } } } diff --git a/java/common/src/main/java/org/apache/qpidity/transport/ProtocolDelegate.java b/java/common/src/main/java/org/apache/qpidity/transport/ProtocolDelegate.java index 6149c3876a..028d570416 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/ProtocolDelegate.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/ProtocolDelegate.java @@ -31,7 +31,9 @@ public interface ProtocolDelegate<C> void init(C context, ProtocolHeader header); - void method(C context, Method method); + void control(C context, Method control); + + void command(C context, Method command); void header(C context, Header header); diff --git a/java/common/src/main/java/org/apache/qpidity/transport/RangeSet.java b/java/common/src/main/java/org/apache/qpidity/transport/RangeSet.java index 97d4be0d2e..ccbcfa99d0 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/RangeSet.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/RangeSet.java @@ -98,6 +98,13 @@ public class RangeSet implements Iterable<Range> ranges.clear(); } + public RangeSet copy() + { + RangeSet copy = new RangeSet(); + copy.ranges.addAll(ranges); + return copy; + } + public String toString() { StringBuffer str = new StringBuffer(); diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Session.java b/java/common/src/main/java/org/apache/qpidity/transport/Session.java index e4f4af95a5..f6f4062c6d 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/Session.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/Session.java @@ -32,6 +32,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; +import static org.apache.qpidity.transport.Option.*; /** * Session @@ -56,23 +57,34 @@ public class Session extends Invoker private static boolean ENABLE_REPLAY = false; private static final Logger log = Logger.get(Session.class); + private byte[] name; + private long timeout = 60000; + // channel may be null Channel channel; // incoming command count - private long commandsIn = 0; + long commandsIn = 0; // completed incoming commands private final RangeSet processed = new RangeSet(); - private long processedMark = -1; private Range syncPoint = null; // outgoing command count private long commandsOut = 0; private Map<Long,Method> commands = new HashMap<Long,Method>(); - private long mark = 0; + private long maxComplete = -1; private AtomicBoolean closed = new AtomicBoolean(false); + public Session(byte[] name) + { + this.name = name; + } + + public byte[] getName() + { + return name; + } public Map<Long,Method> getOutstandingCommands() { @@ -133,25 +145,12 @@ public class Session extends Invoker public void flushProcessed() { - boolean first = true; - RangeSet rest = new RangeSet(); + RangeSet copy; synchronized (processed) { - for (Range r: processed) - { - if (first && r.includes(processedMark)) - { - processedMark = r.getUpper(); - } - else - { - rest.add(r); - } - - first = false; - } + copy = processed.copy(); } - executionComplete(processedMark, rest); + sessionCompleted(copy); } void syncPoint() @@ -193,34 +192,34 @@ public class Session extends Invoker log.debug("%s complete(%d, %d)", this, lower, upper); synchronized (commands) { - for (long id = lower; id <= upper; id++) + for (long id = maxComplete; id <= upper; id++) { commands.remove(id); } + if (lower <= maxComplete + 1) + { + maxComplete = Math.max(maxComplete, upper); + } commands.notifyAll(); log.debug("%s commands remaining: %s", this, commands); } } - void complete(long mark) - { - synchronized (commands) - { - complete(this.mark, mark); - this.mark = mark; - commands.notifyAll(); - } - } - protected void invoke(Method m) { if (m.getEncodedTrack() == Frame.L4) { synchronized (commands) { - // You only need to keep the command if you need command level replay. - // If not we only need to keep track of commands to make sync work - commands.put(commandsOut++,(ENABLE_REPLAY?m:null)); + long next = commandsOut++; + if (next == 0) + { + sessionCommandPoint(0, 0); + } + if (ENABLE_REPLAY) + { + commands.put(next, m); + } channel.method(m); } } @@ -230,7 +229,7 @@ public class Session extends Invoker } } - public void header(Header header) + public void header(Header header) { channel.header(header); } @@ -269,21 +268,32 @@ public class Session extends Invoker public void sync() { + sync(timeout); + } + + public void sync(long timeout) + { log.debug("%s sync()", this); synchronized (commands) { long point = commandsOut - 1; - if (mark < point) + if (maxComplete < point) { - executionSync(); + ExecutionSync sync = new ExecutionSync(); + sync.setSync(true); + invoke(sync); } - while (!closed.get() && mark < point) + long start = System.currentTimeMillis(); + long elapsed = 0; + while (!closed.get() && elapsed < timeout && maxComplete < point) { try { - log.debug("%s waiting for[%d]: %s", this, point, commands); - commands.wait(); + log.debug("%s waiting for[%d]: %d, %s", this, point, + maxComplete, commands); + commands.wait(timeout - elapsed); + elapsed = System.currentTimeMillis() - start; } catch (InterruptedException e) { @@ -291,9 +301,16 @@ public class Session extends Invoker } } - if (mark < point) + if (maxComplete < point) { - throw new RuntimeException("session closed"); + if (closed.get()) + { + throw new RuntimeException("session closed"); + } + else + { + throw new RuntimeException("timed out waiting for sync"); + } } } } @@ -346,16 +363,19 @@ public class Session extends Invoker } } - public T get(long timeout, int nanos) + public T get(long timeout) { synchronized (this) { - while (!closed.get() && !isDone()) + long start = System.currentTimeMillis(); + long elapsed = 0; + while (!closed.get() && timeout - elapsed > 0 && !isDone()) { try { log.debug("%s waiting for result: %s", Session.this, this); - wait(timeout, nanos); + wait(timeout - elapsed); + elapsed = System.currentTimeMillis() - start; } catch (InterruptedException e) { @@ -364,22 +384,23 @@ public class Session extends Invoker } } - if (!isDone()) + if (isDone()) + { + return result; + } + else if (closed.get()) { throw new RuntimeException("session closed"); } - - return result; - } - - public T get(long timeout) - { - return get(timeout, 0); + else + { + return null; + } } public T get() { - return get(0); + return get(timeout); } public boolean isDone() @@ -396,7 +417,8 @@ public class Session extends Invoker public void close() { - sessionClose(); + sessionRequestTimeout(0); + sessionDetach(name); // XXX: channel.close(); } diff --git a/java/common/src/main/java/org/apache/qpidity/transport/SessionDelegate.java b/java/common/src/main/java/org/apache/qpidity/transport/SessionDelegate.java index 5e36a77a98..442aba0e9b 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/SessionDelegate.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/SessionDelegate.java @@ -35,20 +35,16 @@ public abstract class SessionDelegate { public void init(Session ssn, ProtocolHeader hdr) { } - public void method(Session ssn, Method method) { - if (method.getEncodedTrack() == Frame.L4) - { - method.setId(ssn.nextCommandId()); - } - + public void control(Session ssn, Method method) { method.dispatch(ssn, this); + } - if (method.getEncodedTrack() == Frame.L4) + public void command(Session ssn, Method method) { + method.setId(ssn.nextCommandId()); + method.dispatch(ssn, this); + if (!method.hasPayload()) { - if (!method.hasPayload()) - { - ssn.processed(method); - } + ssn.processed(method); } } @@ -60,12 +56,12 @@ public abstract class SessionDelegate @Override public void executionResult(Session ssn, ExecutionResult result) { - ssn.result(result.getCommandId(), result.getData()); + ssn.result(result.getCommandId(), result.getValue()); } - @Override public void executionComplete(Session ssn, ExecutionComplete excmp) + @Override public void sessionCompleted(Session ssn, SessionCompleted cmp) { - RangeSet ranges = excmp.getRangedExecutionSet(); + RangeSet ranges = cmp.getCommands(); if (ranges != null) { for (Range range : ranges) @@ -73,12 +69,27 @@ public abstract class SessionDelegate ssn.complete(range.getLower(), range.getUpper()); } } - ssn.complete(excmp.getCumulativeExecutionMark()); } - @Override public void executionFlush(Session ssn, ExecutionFlush flush) + @Override public void sessionFlush(Session ssn, SessionFlush flush) + { + if (flush.getCompleted()) + { + ssn.flushProcessed(); + } + if (flush.getConfirmed()) + { + throw new Error("not implemented"); + } + if (flush.getExpected()) + { + throw new Error("not implemented"); + } + } + + @Override public void sessionCommandPoint(Session ssn, SessionCommandPoint scp) { - ssn.flushProcessed(); + ssn.commandsIn = scp.getCommandId(); } @Override public void executionSync(Session ssn, ExecutionSync sync) diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Struct.java b/java/common/src/main/java/org/apache/qpidity/transport/Struct.java index 200c3b68e3..f901f9e840 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/Struct.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/Struct.java @@ -71,8 +71,6 @@ public abstract class Struct implements Encodable return type; } - public abstract boolean hasTicket(); - private final boolean isBit(Field<?,?> f) { return f.getType().equals(Boolean.class); @@ -80,14 +78,7 @@ public abstract class Struct implements Encodable private final boolean packed() { - if (this instanceof Method) - { - return false; - } - else - { - return true; - } + return getPackWidth() > 0; } private final boolean encoded(Field<?,?> f) @@ -147,11 +138,6 @@ public abstract class Struct implements Encodable } } - if (hasTicket()) - { - dec.readShort(); - } - for (Field<?,?> f : fields) { if (encoded(f)) @@ -187,11 +173,6 @@ public abstract class Struct implements Encodable } } - if (hasTicket()) - { - enc.writeShort(0x0); - } - for (Field<?,?> f : fields) { if (encoded(f)) diff --git a/java/common/src/main/java/org/apache/qpidity/transport/TransportConstants.java b/java/common/src/main/java/org/apache/qpidity/transport/TransportConstants.java index 54429a1a4f..e9a0705de0 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/TransportConstants.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/TransportConstants.java @@ -2,8 +2,9 @@ package org.apache.qpidity.transport; public class TransportConstants { - private static byte _protocol_version_minor = 0; - private static byte _protocol_version_major = 99; + + private static byte _protocol_version_minor = 10; + private static byte _protocol_version_major = 0; public static void setVersionMajor(byte value) { @@ -24,4 +25,5 @@ public class TransportConstants { return _protocol_version_minor; } + } diff --git a/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java b/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java index ae67483a23..0f6180f54a 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java @@ -20,7 +20,10 @@ */ package org.apache.qpidity.transport.codec; +import java.io.UnsupportedEncodingException; + import java.util.ArrayList; +import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -99,19 +102,19 @@ abstract class AbstractDecoder implements Decoder nbits = 0; } - public short readOctet() + public short readUint8() { return uget(); } - public int readShort() + public int readUint16() { int i = uget() << 8; i |= uget(); return i; } - public long readLong() + public long readUint32() { long l = uget() << 24; l |= uget() << 16; @@ -120,7 +123,12 @@ abstract class AbstractDecoder implements Decoder return l; } - public long readLonglong() + public int readSequenceNo() + { + return (int) readUint32(); + } + + public long readUint64() { long l = 0; for (int i = 0; i < 8; i++) @@ -130,31 +138,67 @@ abstract class AbstractDecoder implements Decoder return l; } - public long readTimestamp() + public long readDatetime() + { + return readUint64(); + } + + private static final String decode(byte[] bytes, String charset) { - return readLonglong(); + try + { + return new String(bytes, charset); + } + catch (UnsupportedEncodingException e) + { + throw new RuntimeException(e); + } } - public String readShortstr() + public String readStr8() + { + short size = readUint8(); + byte[] bytes = new byte[size]; + get(bytes); + return decode(bytes, "UTF-8"); + } + + public String readStr16() { - short size = readOctet(); + int size = readUint16(); byte[] bytes = new byte[size]; get(bytes); - return new String(bytes); + return decode(bytes, "UTF-8"); } - public String readLongstr() + public byte[] readVbin8() { - long size = readLong(); - byte[] bytes = new byte[(int) size]; + int size = readUint8(); + byte[] bytes = new byte[size]; get(bytes); - return new String(bytes); + return bytes; } - public RangeSet readRfc1982LongSet() + public byte[] readVbin16() { - int count = readShort()/8; + int size = readUint16(); + byte[] bytes = new byte[size]; + get(bytes); + return bytes; + } + + public byte[] readVbin32() + { + int size = (int) readUint32(); + byte[] bytes = new byte[size]; + get(bytes); + return bytes; + } + + public RangeSet readSequenceSet() + { + int count = readUint16()/8; if (count == 0) { return null; @@ -164,16 +208,21 @@ abstract class AbstractDecoder implements Decoder RangeSet ranges = new RangeSet(); for (int i = 0; i < count; i++) { - ranges.add(readLong(), readLong()); + ranges.add(readUint32(), readUint32()); } return ranges; } } + public RangeSet readByteRanges() + { + throw new Error("not implemented"); + } + public UUID readUuid() { - long msb = readLonglong(); - long lsb = readLonglong(); + long msb = readUint64(); + long lsb = readUint64(); return new UUID(msb, lsb); } @@ -194,34 +243,39 @@ abstract class AbstractDecoder implements Decoder return null; } } + if (type > 0) + { + int code = readUint16(); + assert code == type; + } st.read(this); return st; } - public Struct readLongStruct() + public Struct readStruct32() { - long size = readLong(); + long size = readUint32(); if (size == 0) { return null; } else { - int type = readShort(); + int type = readUint16(); Struct result = Struct.create(type); result.read(this); return result; } } - public Map<String,Object> readTable() + public Map<String,Object> readMap() { - long size = readLong(); + long size = readUint32(); int start = count; Map<String,Object> result = new LinkedHashMap(); while (count < start + size) { - String key = readShortstr(); + String key = readStr8(); byte code = get(); Type t = getType(code); Object value = read(t); @@ -230,9 +284,9 @@ abstract class AbstractDecoder implements Decoder return result; } - public List<Object> readSequence() + public List<Object> readList() { - long size = readLong(); + long size = readUint32(); int start = count; List<Object> result = new ArrayList(); while (count < start + size) @@ -247,10 +301,15 @@ abstract class AbstractDecoder implements Decoder public List<Object> readArray() { - long size = readLong(); + long size = readUint32(); + if (size == 0) + { + return Collections.EMPTY_LIST; + } + byte code = get(); Type t = getType(code); - long count = readLong(); + long count = readUint32(); List<Object> result = new ArrayList<Object>(); for (int i = 0; i < count; i++) @@ -291,11 +350,11 @@ abstract class AbstractDecoder implements Decoder switch (width) { case 1: - return readOctet(); + return readUint8(); case 2: - return readShort(); + return readUint16(); case 4: - return readLong(); + return readUint32(); default: throw new IllegalStateException("illegal width: " + width); } @@ -313,81 +372,72 @@ abstract class AbstractDecoder implements Decoder { switch (t) { - case OCTET: - case UNSIGNED_BYTE: - return readOctet(); - case SIGNED_BYTE: + case BIN8: + case UINT8: + return readUint8(); + case INT8: return get(); case CHAR: return (char) get(); case BOOLEAN: return get() > 0; - case TWO_OCTETS: - case UNSIGNED_SHORT: - return readShort(); + case BIN16: + case UINT16: + return readUint16(); - case SIGNED_SHORT: - return (short) readShort(); + case INT16: + return (short) readUint16(); - case FOUR_OCTETS: - case UNSIGNED_INT: - return readLong(); + case BIN32: + case UINT32: + return readUint32(); - case UTF32_CHAR: - case SIGNED_INT: - return (int) readLong(); + case CHAR_UTF32: + case INT32: + return (int) readUint32(); case FLOAT: - return Float.intBitsToFloat((int) readLong()); + return Float.intBitsToFloat((int) readUint32()); - case EIGHT_OCTETS: - case SIGNED_LONG: - case UNSIGNED_LONG: + case BIN64: + case UINT64: + case INT64: case DATETIME: - return readLonglong(); + return readUint64(); case DOUBLE: - return Double.longBitsToDouble(readLonglong()); - - case SIXTEEN_OCTETS: - case THIRTY_TWO_OCTETS: - case SIXTY_FOUR_OCTETS: - case _128_OCTETS: - case SHORT_BINARY: - case BINARY: - case LONG_BINARY: - return readBytes(t); + return Double.longBitsToDouble(readUint64()); case UUID: return readUuid(); - case SHORT_STRING: - case SHORT_UTF8_STRING: - case SHORT_UTF16_STRING: - case SHORT_UTF32_STRING: - case STRING: - case UTF8_STRING: - case UTF16_STRING: - case UTF32_STRING: - case LONG_STRING: - case LONG_UTF8_STRING: - case LONG_UTF16_STRING: - case LONG_UTF32_STRING: + case STR8: + return readStr8(); + + case STR16: + return readStr16(); + + case STR8_LATIN: + case STR8_UTF16: + case STR16_LATIN: + case STR16_UTF16: // XXX: need to do character conversion return new String(readBytes(t)); - case TABLE: - return readTable(); - case SEQUENCE: - return readSequence(); + case MAP: + return readMap(); + case LIST: + return readList(); case ARRAY: return readArray(); + case STRUCT32: + return readStruct32(); - case FIVE_OCTETS: - case DECIMAL: - case NINE_OCTETS: - case LONG_DECIMAL: + case BIN40: + case DEC32: + case BIN72: + case DEC64: // XXX: what types are we supposed to use here? return readBytes(t); diff --git a/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java b/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java index c2dd205d66..56b4537719 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java @@ -20,8 +20,11 @@ */ package org.apache.qpidity.transport.codec; +import java.io.UnsupportedEncodingException; + import java.nio.ByteBuffer; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -48,17 +51,17 @@ abstract class AbstractEncoder implements Encoder static { ENCODINGS.put(Boolean.class, Type.BOOLEAN); - ENCODINGS.put(String.class, Type.LONG_STRING); - ENCODINGS.put(Long.class, Type.SIGNED_LONG); - ENCODINGS.put(Integer.class, Type.SIGNED_INT); - ENCODINGS.put(Short.class, Type.SIGNED_SHORT); - ENCODINGS.put(Byte.class, Type.SIGNED_BYTE); - ENCODINGS.put(Map.class, Type.TABLE); - ENCODINGS.put(List.class, Type.SEQUENCE); + ENCODINGS.put(String.class, Type.STR16); + ENCODINGS.put(Long.class, Type.INT64); + ENCODINGS.put(Integer.class, Type.INT32); + ENCODINGS.put(Short.class, Type.INT16); + ENCODINGS.put(Byte.class, Type.INT8); + ENCODINGS.put(Map.class, Type.MAP); + ENCODINGS.put(List.class, Type.LIST); ENCODINGS.put(Float.class, Type.FLOAT); ENCODINGS.put(Double.class, Type.DOUBLE); ENCODINGS.put(Character.class, Type.CHAR); - ENCODINGS.put(byte[].class, Type.LONG_BINARY); + ENCODINGS.put(byte[].class, Type.VBIN32); } protected Sizer sizer() @@ -120,14 +123,14 @@ abstract class AbstractEncoder implements Encoder flushBits(); } - public void writeOctet(short b) + public void writeUint8(short b) { assert b < 0x100; put((byte) b); } - public void writeShort(int s) + public void writeUint16(int s) { assert s < 0x10000; @@ -135,7 +138,7 @@ abstract class AbstractEncoder implements Encoder put(lsb(s)); } - public void writeLong(long i) + public void writeUint32(long i) { assert i < 0x100000000L; @@ -145,7 +148,12 @@ abstract class AbstractEncoder implements Encoder put(lsb(i)); } - public void writeLonglong(long l) + public void writeSequenceNo(int i) + { + writeUint32(i); + } + + public void writeUint64(long l) { for (int i = 0; i < 8; i++) { @@ -154,47 +162,101 @@ abstract class AbstractEncoder implements Encoder } - public void writeTimestamp(long l) + public void writeDatetime(long l) + { + writeUint64(l); + } + + private static final String checkLength(String s, int n) + { + if (s == null) + { + return ""; + } + + if (s.length() > n) + { + throw new IllegalArgumentException("string too long: " + s); + } + else + { + return s; + } + } + + private static final byte[] encode(String s, String charset) + { + try + { + return s.getBytes(charset); + } + catch (UnsupportedEncodingException e) + { + throw new RuntimeException(e); + } + } + + public void writeStr8(String s) { - writeLonglong(l); + s = checkLength(s, 255); + writeUint8((short) s.length()); + put(ByteBuffer.wrap(encode(s, "UTF-8"))); } + public void writeStr16(String s) + { + s = checkLength(s, 65535); + writeUint16(s.length()); + put(ByteBuffer.wrap(encode(s, "UTF-8"))); + } - public void writeShortstr(String s) + public void writeVbin8(byte[] bytes) { - if (s == null) { s = ""; } - if (s.length() > 255) { - throw new IllegalArgumentException(s); + if (bytes == null) { bytes = new byte[0]; } + if (bytes.length > 255) + { + throw new IllegalArgumentException("array too long: " + bytes.length); } - writeOctet((short) s.length()); - put(ByteBuffer.wrap(s.getBytes())); + writeUint8((short) bytes.length); + put(ByteBuffer.wrap(bytes)); } - public void writeLongstr(String s) + public void writeVbin16(byte[] bytes) { - if (s == null) { s = ""; } - writeLong(s.length()); - put(ByteBuffer.wrap(s.getBytes())); + if (bytes == null) { bytes = new byte[0]; } + writeUint16(bytes.length); + put(ByteBuffer.wrap(bytes)); } + public void writeVbin32(byte[] bytes) + { + if (bytes == null) { bytes = new byte[0]; } + writeUint32(bytes.length); + put(ByteBuffer.wrap(bytes)); + } - public void writeRfc1982LongSet(RangeSet ranges) + public void writeSequenceSet(RangeSet ranges) { if (ranges == null) { - writeShort((short) 0); + writeUint16((short) 0); } else { - writeShort(ranges.size() * 8); + writeUint16(ranges.size() * 8); for (Range range : ranges) { - writeLong(range.getLower()); - writeLong(range.getUpper()); + writeUint32(range.getLower()); + writeUint32(range.getUpper()); } } } + public void writeByteRanges(RangeSet ranges) + { + throw new Error("not implemented"); + } + public void writeUuid(UUID uuid) { long msb = 0; @@ -202,15 +264,10 @@ abstract class AbstractEncoder implements Encoder if (uuid != null) { msb = uuid.getMostSignificantBits(); - uuid.getLeastSignificantBits(); + lsb = uuid.getLeastSignificantBits(); } - writeLonglong(msb); - writeLonglong(lsb); - } - - public void writeContent(String c) - { - throw new Error("Deprecated"); + writeUint64(msb); + writeUint64(lsb); } public void writeStruct(int type, Struct s) @@ -237,22 +294,27 @@ abstract class AbstractEncoder implements Encoder } } + if (type > 0) + { + writeUint16(type); + } + s.write(this); } - public void writeLongStruct(Struct s) + public void writeStruct32(Struct s) { if (s == null) { - writeLong(0); + writeUint32(0); } else { Sizer sizer = sizer(); - sizer.writeShort(s.getEncodedType()); + sizer.writeUint16(s.getEncodedType()); s.write(sizer); - writeLong(sizer.size()); - writeShort(s.getEncodedType()); + writeUint32(sizer.size()); + writeUint16(s.getEncodedType()); s.write(this); } } @@ -309,46 +371,46 @@ abstract class AbstractEncoder implements Encoder return null; } - public void writeTable(Map<String,Object> table) + public void writeMap(Map<String,Object> map) { - if (table == null) + if (map == null) { - writeLong(0); + writeUint32(0); return; } Sizer sizer = sizer(); - sizer.writeTable(table); + sizer.writeMap(map); // XXX: - 4 - writeLong(sizer.size() - 4); - writeTableEntries(table); + writeUint32(sizer.size() - 4); + writeMapEntries(map); } - protected void writeTableEntries(Map<String,Object> table) + protected void writeMapEntries(Map<String,Object> map) { - for (Map.Entry<String,Object> entry : table.entrySet()) + for (Map.Entry<String,Object> entry : map.entrySet()) { String key = entry.getKey(); Object value = entry.getValue(); Type type = encoding(value); - writeShortstr(key); + writeStr8(key); put(type.code); write(type, value); } } - public void writeSequence(List<Object> sequence) + public void writeList(List<Object> list) { Sizer sizer = sizer(); - sizer.writeSequence(sequence); + sizer.writeList(list); // XXX: - 4 - writeLong(sizer.size() - 4); - writeSequenceEntries(sequence); + writeUint32(sizer.size() - 4); + writeListEntries(list); } - protected void writeSequenceEntries(List<Object> sequence) + protected void writeListEntries(List<Object> list) { - for (Object value : sequence) + for (Object value : list) { Type type = encoding(value); put(type.code); @@ -358,10 +420,15 @@ abstract class AbstractEncoder implements Encoder public void writeArray(List<Object> array) { + if (array == null) + { + array = Collections.EMPTY_LIST; + } + Sizer sizer = sizer(); sizer.writeArray(array); // XXX: -4 - writeLong(sizer.size() - 4); + writeUint32(sizer.size() - 4); writeArrayEntries(array); } @@ -371,7 +438,7 @@ abstract class AbstractEncoder implements Encoder if (array.isEmpty()) { - type = Type.VOID; + return; } else { @@ -380,6 +447,8 @@ abstract class AbstractEncoder implements Encoder put(type.code); + writeUint32(array.size()); + for (Object value : array) { write(type, value); @@ -409,13 +478,13 @@ abstract class AbstractEncoder implements Encoder switch (width) { case 1: - writeOctet((short) size); + writeUint8((short) size); break; case 2: - writeShort(size); + writeUint16(size); break; case 4: - writeLong(size); + writeUint32(size); break; default: throw new IllegalStateException("illegal width: " + width); @@ -444,11 +513,11 @@ abstract class AbstractEncoder implements Encoder { switch (t) { - case OCTET: - case UNSIGNED_BYTE: - writeOctet(coerce(Short.class, value)); + case BIN8: + case UINT8: + writeUint8(coerce(Short.class, value)); break; - case SIGNED_BYTE: + case INT8: put(coerce(Byte.class, value)); break; case CHAR: @@ -465,85 +534,78 @@ abstract class AbstractEncoder implements Encoder } break; - case TWO_OCTETS: - case UNSIGNED_SHORT: - writeShort(coerce(Integer.class, value)); + case BIN16: + case UINT16: + writeUint16(coerce(Integer.class, value)); break; - case SIGNED_SHORT: - writeShort(coerce(Short.class, value)); + case INT16: + writeUint16(coerce(Short.class, value)); break; - case FOUR_OCTETS: - case UNSIGNED_INT: - writeLong(coerce(Long.class, value)); + case BIN32: + case UINT32: + writeUint32(coerce(Long.class, value)); break; - case UTF32_CHAR: - case SIGNED_INT: - writeLong(coerce(Integer.class, value)); + case CHAR_UTF32: + case INT32: + writeUint32(coerce(Integer.class, value)); break; case FLOAT: - writeLong(Float.floatToIntBits(coerce(Float.class, value))); + writeUint32(Float.floatToIntBits(coerce(Float.class, value))); break; - case EIGHT_OCTETS: - case SIGNED_LONG: - case UNSIGNED_LONG: + case BIN64: + case UINT64: + case INT64: case DATETIME: - writeLonglong(coerce(Long.class, value)); + writeUint64(coerce(Long.class, value)); break; case DOUBLE: long bits = Double.doubleToLongBits(coerce(Double.class, value)); - writeLonglong(bits); - break; - - case SIXTEEN_OCTETS: - case THIRTY_TWO_OCTETS: - case SIXTY_FOUR_OCTETS: - case _128_OCTETS: - case SHORT_BINARY: - case BINARY: - case LONG_BINARY: - writeBytes(t, coerce(byte[].class, value)); + writeUint64(bits); break; case UUID: writeUuid(coerce(UUID.class, value)); break; - case SHORT_STRING: - case SHORT_UTF8_STRING: - case SHORT_UTF16_STRING: - case SHORT_UTF32_STRING: - case STRING: - case UTF8_STRING: - case UTF16_STRING: - case UTF32_STRING: - case LONG_STRING: - case LONG_UTF8_STRING: - case LONG_UTF16_STRING: - case LONG_UTF32_STRING: + case STR8: + writeStr8(coerce(String.class, value)); + break; + + case STR16: + writeStr16(coerce(String.class, value)); + break; + + case STR8_LATIN: + case STR8_UTF16: + case STR16_LATIN: + case STR16_UTF16: // XXX: need to do character conversion writeBytes(t, coerce(String.class, value).getBytes()); break; - case TABLE: - writeTable((Map<String,Object>) coerce(Map.class, value)); + case MAP: + writeMap((Map<String,Object>) coerce(Map.class, value)); break; - case SEQUENCE: - writeSequence(coerce(List.class, value)); + case LIST: + writeList(coerce(List.class, value)); break; case ARRAY: writeArray(coerce(List.class, value)); break; + case STRUCT32: + writeStruct32(coerce(Struct.class, value)); + break; - case FIVE_OCTETS: - case DECIMAL: - case NINE_OCTETS: - case LONG_DECIMAL: + case BIN40: + case DEC32: + case BIN72: + case DEC64: // XXX: what types are we supposed to use here? writeBytes(t, coerce(byte[].class, value)); break; diff --git a/java/common/src/main/java/org/apache/qpidity/transport/codec/Decoder.java b/java/common/src/main/java/org/apache/qpidity/transport/codec/Decoder.java index f0738e0a91..df2d208118 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/codec/Decoder.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/codec/Decoder.java @@ -38,26 +38,30 @@ public interface Decoder { boolean readBit(); - short readOctet(); - int readShort(); - long readLong(); - long readLonglong(); + short readUint8(); + int readUint16(); + long readUint32(); + long readUint64(); - long readTimestamp(); - - String readShortstr(); - String readLongstr(); - - RangeSet readRfc1982LongSet(); + long readDatetime(); UUID readUuid(); - String readContent(); + int readSequenceNo(); + RangeSet readSequenceSet(); // XXX + RangeSet readByteRanges(); // XXX - Struct readStruct(int type); - Struct readLongStruct(); + String readStr8(); + String readStr16(); - Map<String,Object> readTable(); - List<Object> readSequence(); + byte[] readVbin8(); + byte[] readVbin16(); + byte[] readVbin32(); + + Struct readStruct32(); + Map<String,Object> readMap(); + List<Object> readList(); List<Object> readArray(); + Struct readStruct(int type); + } diff --git a/java/common/src/main/java/org/apache/qpidity/transport/codec/Encoder.java b/java/common/src/main/java/org/apache/qpidity/transport/codec/Encoder.java index 1b2fe0213e..0449ba6702 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/codec/Encoder.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/codec/Encoder.java @@ -40,26 +40,30 @@ public interface Encoder void flush(); void writeBit(boolean b); - void writeOctet(short b); - void writeShort(int s); - void writeLong(long i); - void writeLonglong(long l); + void writeUint8(short b); + void writeUint16(int s); + void writeUint32(long i); + void writeUint64(long l); - void writeTimestamp(long l); - - void writeShortstr(String s); - void writeLongstr(String s); - - void writeRfc1982LongSet(RangeSet ranges); + void writeDatetime(long l); void writeUuid(UUID uuid); - void writeContent(String c); + void writeSequenceNo(int s); + void writeSequenceSet(RangeSet ranges); // XXX + void writeByteRanges(RangeSet ranges); // XXX - void writeStruct(int type, Struct s); - void writeLongStruct(Struct s); + void writeStr8(String s); + void writeStr16(String s); - void writeTable(Map<String,Object> table); - void writeSequence(List<Object> sequence); + void writeVbin8(byte[] bytes); + void writeVbin16(byte[] bytes); + void writeVbin32(byte[] bytes); + + void writeStruct32(Struct s); + void writeMap(Map<String,Object> map); + void writeList(List<Object> list); void writeArray(List<Object> array); + void writeStruct(int type, Struct s); + } diff --git a/java/common/src/main/java/org/apache/qpidity/transport/codec/Sizer.java b/java/common/src/main/java/org/apache/qpidity/transport/codec/Sizer.java index b98bf98239..8ffdd5150b 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/codec/Sizer.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/codec/Sizer.java @@ -38,30 +38,34 @@ public interface Sizer extends Encoder public static final Sizer NULL = new Sizer() { - public void flush() {}; + public void flush() {} - public void writeBit(boolean b) {}; - public void writeOctet(short b) {}; - public void writeShort(int s) {}; - public void writeLong(long i) {}; - public void writeLonglong(long l) {}; + public void writeBit(boolean b) {} + public void writeUint8(short b) {} + public void writeUint16(int s) {} + public void writeUint32(long i) {} + public void writeUint64(long l) {} - public void writeTimestamp(long l) {}; + public void writeDatetime(long l) {} + public void writeUuid(UUID uuid) {} - public void writeShortstr(String s) {}; - public void writeLongstr(String s) {}; + public void writeSequenceNo(int s) {} + public void writeSequenceSet(RangeSet ranges) {} // XXX + public void writeByteRanges(RangeSet ranges) {} // XXX - public void writeRfc1982LongSet(RangeSet ranges) {}; - public void writeUuid(UUID uuid) {}; + public void writeStr8(String s) {} + public void writeStr16(String s) {} - public void writeContent(String c) {}; + public void writeVbin8(byte[] bytes) {} + public void writeVbin16(byte[] bytes) {} + public void writeVbin32(byte[] bytes) {} - public void writeStruct(int type, Struct s) {}; - public void writeLongStruct(Struct s) {}; + public void writeStruct32(Struct s) {} + public void writeMap(Map<String,Object> map) {} + public void writeList(List<Object> list) {} + public void writeArray(List<Object> array) {} - public void writeTable(Map<String,Object> table) {}; - public void writeSequence(List<Object> sequence) {}; - public void writeArray(List<Object> array) {}; + public void writeStruct(int type, Struct s) {} public int getSize() { return 0; } diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java b/java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java index 5c35596fd8..d493245f0c 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java @@ -37,6 +37,7 @@ import org.apache.qpidity.transport.ProtocolError; import org.apache.qpidity.transport.ProtocolEvent; import org.apache.qpidity.transport.ProtocolHeader; import org.apache.qpidity.transport.Receiver; +import org.apache.qpidity.transport.SegmentType; import org.apache.qpidity.transport.Struct; @@ -118,7 +119,7 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate { switch (frame.getType()) { - case Frame.BODY: + case BODY: emit(frame, new Data(frame, frame.isFirstFrame(), frame.isLastFrame())); break; @@ -158,22 +159,29 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate } } - private ProtocolEvent decode(Frame frame, byte type, List<ByteBuffer> segment) + private ProtocolEvent decode(Frame frame, SegmentType type, List<ByteBuffer> segment) { FragmentDecoder dec = new FragmentDecoder(segment.iterator()); switch (type) { - case Frame.METHOD: - int methodType = dec.readShort(); - Method method = Method.create(methodType); - method.read(dec); - return method; - case Frame.HEADER: + case CONTROL: + int controlType = dec.readUint16(); + Method control = Method.create(controlType); + control.read(dec); + return control; + case COMMAND: + int commandType = dec.readUint16(); + // read in the session header, right now we don't use it + dec.readUint16(); + Method command = Method.create(commandType); + command.read(dec); + return command; + case HEADER: List<Struct> structs = new ArrayList(); while (dec.hasRemaining()) { - structs.add(dec.readLongStruct()); + structs.add(dec.readStruct32()); } return new Header(structs,frame.isLastFrame() && frame.isLastSegment()); default: diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/Disassembler.java b/java/common/src/main/java/org/apache/qpidity/transport/network/Disassembler.java index d38d8ded98..0357df6e86 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/network/Disassembler.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/network/Disassembler.java @@ -31,6 +31,7 @@ import org.apache.qpidity.transport.ProtocolDelegate; import org.apache.qpidity.transport.ProtocolError; import org.apache.qpidity.transport.ProtocolEvent; import org.apache.qpidity.transport.ProtocolHeader; +import org.apache.qpidity.transport.SegmentType; import org.apache.qpidity.transport.Sender; import org.apache.qpidity.transport.Struct; @@ -76,50 +77,50 @@ public class Disassembler implements Sender<ConnectionEvent>, sender.close(); } - private void fragment(byte flags, byte type, ConnectionEvent event, + private void fragment(byte flags, SegmentType type, ConnectionEvent event, ByteBuffer buf, boolean first, boolean last) { if(!buf.hasRemaining()) { //empty data - byte nflags = flags; + byte nflags = flags; if (first) { nflags |= FIRST_FRAME; first = false; } - nflags |= LAST_FRAME; - Frame frame = new Frame(nflags, type, + nflags |= LAST_FRAME; + Frame frame = new Frame(nflags, type, event.getProtocolEvent().getEncodedTrack(), event.getChannel()); - // frame.addFragment(buf); + // frame.addFragment(buf); sender.send(frame); } else { - while (buf.hasRemaining()) - { - ByteBuffer slice = buf.slice(); - slice.limit(min(maxPayload, slice.remaining())); - buf.position(buf.position() + slice.remaining()); - - byte newflags = flags; - if (first) + while (buf.hasRemaining()) { - newflags |= FIRST_FRAME; - first = false; + ByteBuffer slice = buf.slice(); + slice.limit(min(maxPayload, slice.remaining())); + buf.position(buf.position() + slice.remaining()); + + byte newflags = flags; + if (first) + { + newflags |= FIRST_FRAME; + first = false; + } + if (last && !buf.hasRemaining()) + { + newflags |= LAST_FRAME; + } + + Frame frame = new Frame(newflags, type, + event.getProtocolEvent().getEncodedTrack(), + event.getChannel()); + frame.addFragment(slice); + sender.send(frame); } - if (last && !buf.hasRemaining()) - { - newflags |= LAST_FRAME; - } - - Frame frame = new Frame(newflags, type, - event.getProtocolEvent().getEncodedTrack(), - event.getChannel()); - frame.addFragment(slice); - sender.send(frame); - } } } @@ -128,15 +129,40 @@ public class Disassembler implements Sender<ConnectionEvent>, sender.send(header); } - public void method(ConnectionEvent event, Method method) + public void control(ConnectionEvent event, Method method) + { + method(event, method, SegmentType.CONTROL); + } + + public void command(ConnectionEvent event, Method method) + { + method(event, method, SegmentType.COMMAND); + } + + private void method(ConnectionEvent event, Method method, SegmentType type) { SizeEncoder sizer = new SizeEncoder(); - sizer.writeShort(method.getEncodedType()); + sizer.writeUint16(method.getEncodedType()); + if (type == SegmentType.COMMAND) + { + sizer.writeUint16(0); + } method.write(sizer); ByteBuffer buf = ByteBuffer.allocate(sizer.size()); BBEncoder enc = new BBEncoder(buf); - enc.writeShort(method.getEncodedType()); + enc.writeUint16(method.getEncodedType()); + if (type == SegmentType.COMMAND) + { + if (method.isSync()) + { + enc.writeUint16(0x0101); + } + else + { + enc.writeUint16(0x0100); + } + } method.write(enc); enc.flush(); buf.flip(); @@ -148,7 +174,7 @@ public class Disassembler implements Sender<ConnectionEvent>, flags |= LAST_SEG; } - fragment(flags, METHOD, event, buf, true, true); + fragment(flags, type, event, buf, true, true); } public void header(ConnectionEvent event, Header header) @@ -159,24 +185,24 @@ public class Disassembler implements Sender<ConnectionEvent>, SizeEncoder sizer = new SizeEncoder(); for (Struct st : header.getStructs()) { - sizer.writeLongStruct(st); + sizer.writeStruct32(st); } buf = ByteBuffer.allocate(sizer.size()); BBEncoder enc = new BBEncoder(buf); for (Struct st : header.getStructs()) { - enc.writeLongStruct(st); + enc.writeStruct32(st); enc.flush(); } header.setBuf(buf); } else { - buf = header.getBuf(); + buf = header.getBuf(); } buf.flip(); - fragment((byte) 0x0, HEADER, event, buf, true, true); + fragment((byte) 0x0, SegmentType.HEADER, event, buf, true, true); } public void data(ConnectionEvent event, Data data) @@ -187,7 +213,7 @@ public class Disassembler implements Sender<ConnectionEvent>, { ByteBuffer buf = it.next(); boolean last = data.isLast() && !it.hasNext(); - fragment(LAST_SEG, BODY, event, buf, first, last); + fragment(LAST_SEG, SegmentType.BODY, event, buf, first, last); first = false; } } diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java b/java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java index a5c5db4dba..c5fb7b9986 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java @@ -20,6 +20,7 @@ */ package org.apache.qpidity.transport.network; +import org.apache.qpidity.transport.SegmentType; import org.apache.qpidity.transport.util.SliceIterator; import java.nio.ByteBuffer; @@ -48,10 +49,6 @@ public class Frame implements NetworkEvent, Iterable<ByteBuffer> public static final byte L3 = 2; public static final byte L4 = 3; - public static final byte METHOD = 1; - public static final byte HEADER = 2; - public static final byte BODY = 3; - public static final byte RESERVED = 0x0; public static final byte VERSION = 0x0; @@ -62,13 +59,13 @@ public class Frame implements NetworkEvent, Iterable<ByteBuffer> public static final byte LAST_FRAME = 0x1; final private byte flags; - final private byte type; + final private SegmentType type; final private byte track; final private int channel; final private List<ByteBuffer> fragments; private int size; - public Frame(byte flags, byte type, byte track, int channel) + public Frame(byte flags, SegmentType type, byte track, int channel) { this.flags = flags; this.type = type; @@ -99,7 +96,7 @@ public class Frame implements NetworkEvent, Iterable<ByteBuffer> return size; } - public byte getType() + public SegmentType getType() { return type; } @@ -153,7 +150,7 @@ public class Frame implements NetworkEvent, Iterable<ByteBuffer> { StringBuilder str = new StringBuilder(); str.append(String.format - ("[%05d %05d %1d %1d %d%d%d%d]", getChannel(), getSize(), + ("[%05d %05d %1d %1d %d%d%d%d] ", getChannel(), getSize(), getTrack(), getType(), isFirstSegment() ? 1 : 0, isLastSegment() ? 1 : 0, isFirstFrame() ? 1 : 0, isLastFrame() ? 1 : 0)); @@ -170,7 +167,7 @@ public class Frame implements NetworkEvent, Iterable<ByteBuffer> str.append(" | "); } - str.append(str(buf, 20)); + str.append(str(buf)); } return str.toString(); diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java b/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java index 871c45743e..2d41a9f516 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java @@ -22,10 +22,10 @@ package org.apache.qpidity.transport.network; import java.nio.ByteBuffer; -import org.apache.qpidity.transport.Constant; import org.apache.qpidity.transport.ProtocolError; import org.apache.qpidity.transport.ProtocolHeader; import org.apache.qpidity.transport.Receiver; +import org.apache.qpidity.transport.SegmentType; import static org.apache.qpidity.transport.util.Functions.*; @@ -77,7 +77,7 @@ public class InputHandler implements Receiver<ByteBuffer> private byte minor; private byte flags; - private byte type; + private SegmentType type; private byte track; private int channel; private int size; @@ -146,7 +146,7 @@ public class InputHandler implements Receiver<ByteBuffer> flags = buf.get(); return FRAME_HDR_TYPE; case FRAME_HDR_TYPE: - type = buf.get(); + type = SegmentType.get(buf.get()); return FRAME_HDR_SIZE1; case FRAME_HDR_SIZE1: size = (0xFF & buf.get()) << 8; @@ -218,7 +218,7 @@ public class InputHandler implements Receiver<ByteBuffer> return FRAME_END; } case FRAME_END: - return expect(buf, Constant.FRAME_END, FRAME_HDR); + return expect(buf, OutputHandler.FRAME_END, FRAME_HDR); default: throw new IllegalStateException(); } diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java b/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java index 9f770bcb1c..8f615cf80d 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java @@ -67,11 +67,13 @@ public class OutputHandler implements Sender<NetworkEvent>, NetworkDelegate } } + public static final int FRAME_END = 0xCE; + public void frame(Frame frame) { ByteBuffer hdr = ByteBuffer.allocate(HEADER_SIZE + frame.getSize() + 1); hdr.put(frame.getFlags()); - hdr.put(frame.getType()); + hdr.put((byte) frame.getType().getValue()); hdr.putShort((short) (frame.getSize() + HEADER_SIZE)); hdr.put(RESERVED); hdr.put(frame.getTrack()); @@ -84,7 +86,7 @@ public class OutputHandler implements Sender<NetworkEvent>, NetworkDelegate { hdr.put(buf); } - hdr.put((byte) Constant.FRAME_END); + hdr.put((byte) FRAME_END); hdr.flip(); synchronized (lock) { diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaHandler.java b/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaHandler.java index 2e4875cf42..7da719087c 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaHandler.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaHandler.java @@ -200,7 +200,7 @@ public class MinaHandler<E> implements IoHandler ConnectionDelegate delegate) { return connect(host, port, new ConnectionBinding - (delegate, InputHandler.State.FRAME_HDR)); + (delegate, InputHandler.State.PROTO_HDR)); } private static class ConnectionBinding diff --git a/java/common/src/main/java/org/apache/qpidity/transport/util/Functions.java b/java/common/src/main/java/org/apache/qpidity/transport/util/Functions.java index 7dbb3d30e8..14e756c047 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/util/Functions.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/util/Functions.java @@ -52,15 +52,24 @@ public class Functions public static final String str(ByteBuffer buf, int limit) { StringBuilder str = new StringBuilder(); + str.append('"'); + for (int i = 0; i < min(buf.remaining(), limit); i++) { - if (i > 0 && i % 2 == 0) + byte c = buf.get(buf.position() + i); + + if (c > 31 && c < 127 && c != '\\') { - str.append(" "); + str.append((char)c); + } + else + { + str.append(String.format("\\x%02x", c)); } - str.append(String.format("%02x", buf.get(buf.position() + i))); } + str.append('"'); + return str.toString(); } diff --git a/java/common/src/test/java/org/apache/qpidity/transport/ConnectionTest.java b/java/common/src/test/java/org/apache/qpidity/transport/ConnectionTest.java index d7ece9b745..1a83786e12 100644 --- a/java/common/src/test/java/org/apache/qpidity/transport/ConnectionTest.java +++ b/java/common/src/test/java/org/apache/qpidity/transport/ConnectionTest.java @@ -100,12 +100,12 @@ public class ConnectionTest extends TestCase } Channel ch = conn.getChannel(0); - Session ssn = new Session(); + Session ssn = new Session("test".getBytes()); ssn.attach(ch); try { - ssn.sessionOpen(1234); + ssn.sessionAttach(ssn.getName()); fail("writing to a closed socket succeeded"); } catch (TransportException e) diff --git a/java/common/templating.py b/java/common/templating.py new file mode 100644 index 0000000000..832b7ecb9c --- /dev/null +++ b/java/common/templating.py @@ -0,0 +1,101 @@ + +class Parser: + + def __init__(self, **kwargs): + self.output = "" + self.environ = {"out": self.parse} + for k, v in kwargs.items(): + self.environ[k] = v + self.text = "" + self.level = 0 + self.line = None + + def action(self, actor): + text = self.text + self.text = "" + actor(text) + + def out(self, text): + self.output += text + + def prefix_lines(self, text): + return "%s%s" % ("\n"*(self.line - 1 - text.count("\n")), text) + + def evaluate(self, text): + self.out(str(eval(self.prefix_lines(text), self.environ, self.environ))) + + def execute(self, text): + exec self.prefix_lines(text) in self.environ, self.environ + + def parse(self, input): + old_line = self.line + try: + state = self.start + self.line = 1 + for ch in input: + state = state(ch) + if ch == "\n": + self.line += 1 + if state == self.start: + self.action(self.out) + elif state == self.alnum: + self.action(self.evaluate) + else: + raise ParseError() + finally: + self.line = old_line + + def start(self, ch): + if ch == "$": + return self.dollar + else: + self.text += ch + return self.start + + def dollar(self, ch): + if ch == "$": + self.text += "$" + return self.start + elif ch == "(": + self.action(self.out) + return self.expression + elif ch == "{": + self.action(self.out) + return self.block + else: + self.action(self.out) + self.text += ch + return self.alnum + + def alnum(self, ch): + if ch.isalnum(): + self.text += ch + return self.alnum + else: + self.action(self.evaluate) + self.text += ch + return self.start + + def match(self, ch, start, end): + if ch == start: + self.level += 1 + if ch == end: + self.level -= 1 + + def block(self, ch): + if not self.level and ch == "}": + self.action(self.execute) + return self.start + else: + self.match(ch, "{", "}") + self.text += ch + return self.block + + def expression(self, ch): + if not self.level and ch == ")": + self.action(self.evaluate) + return self.start + else: + self.match(ch, "(", ")") + self.text += ch + return self.expression |