diff options
author | Rafael H. Schloming <rhs@apache.org> | 2008-04-16 13:32:13 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2008-04-16 13:32:13 +0000 |
commit | d054b41aaa1466b65c9dc2acf1b22ca98ec3128c (patch) | |
tree | 08055eba3020d3dcad5c9a9587d98b15d0b97c89 /java/common | |
parent | f375be1908ad22329fe9ed21a8c196475ade7e59 (diff) | |
download | qpid-python-d054b41aaa1466b65c9dc2acf1b22ca98ec3128c.tar.gz |
QPID-901: updates to the java client to use the 0-10 final spec instead of the 0-10 preview spec; this includes improvements to the codegen process as well as some modifications to the shared code path in the client to not lose per message state when consumers are closed.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@648692 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common')
42 files changed, 1493 insertions, 1044 deletions
diff --git a/java/common/Composite.tpl b/java/common/Composite.tpl new file mode 100644 index 0000000000..4172ffdefc --- /dev/null +++ b/java/common/Composite.tpl @@ -0,0 +1,173 @@ +package org.apache.qpidity.transport; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import org.apache.qpidity.transport.codec.Decoder; +import org.apache.qpidity.transport.codec.Encodable; +import org.apache.qpidity.transport.codec.Encoder; + +import org.apache.qpidity.transport.network.Frame; + +${ +from genutil import * + +cls = klass(type)["@name"] + +if type.name in ("control", "command"): + base = "Method" + size = 0 + pack = 2 + if type["segments"]: + payload = "true" + else: + payload = "false" + if type.name == "control" and cls == "connection": + track = "Frame.L1" + elif cls == "session" and type["@name"] in ("attach", "attached", "detach", "detached"): + track = "Frame.L2" + elif type.name == "command": + track = "Frame.L4" + else: + track = "Frame.L3" +else: + base = "Struct" + size = type["@size"] + pack = type["@pack"] + payload = "false" + track = "-1" + +typecode = code(type) +} + +public class $name extends $base { + + public static final int TYPE = $typecode; + + public final int getStructType() { + return TYPE; + } + + public final int getSizeWidth() { + return $size; + } + + public final int getPackWidth() { + return $pack; + } + + public final boolean hasPayload() { + return $payload; + } + + public final byte getEncodedTrack() { + return $track; + } + + private static final List<Field<?,?>> FIELDS = new ArrayList<Field<?,?>>(); + public List<Field<?,?>> getFields() { return FIELDS; } + +${ +fields = get_fields(type) +params = get_parameters(fields) +options = get_options(fields) + +for f in fields: + out(" private boolean has_$(f.name);\n") + out(" private $(f.type) $(f.name);\n") +} + +${ +if fields: + out(" public $name() {}\n") +} + + public $name($(", ".join(params))) { +${ +for f in fields: + if f.option: continue + out(" $(f.set)($(f.name));\n") + +for f in options: + out(" boolean _$(f.name) = false;\n") + +if options: + out(""" + for (int i=0; i < _options.length; i++) { + switch (_options[i]) { +""") + + for f in options: + out(" case $(f.option): _$(f.name) = true; break;\n") + + out(""" case NO_OPTION: break; + default: throw new IllegalArgumentException("invalid option: " + _options[i]); + } + } +""") + +for f in options: + out(" $(f.set)(_$(f.name));\n") +} + } + + public <C> void dispatch(C context, MethodDelegate<C> delegate) { + delegate.$(dromedary(name))(context, this); + } + +${ +for f in fields: + out(""" + public final boolean $(f.has)() { + return has_$(f.name); + } + + public final $name $(f.clear)() { + this.has_$(f.name) = false; + this.$(f.name) = $(f.default); + this.dirty = true; + return this; + } + + public final $(f.type) $(f.get)() { + return $(f.name); + } + + public final $name $(f.set)($(f.type) value) { + this.$(f.name) = value; + this.has_$(f.name) = true; + this.dirty = true; + return this; + } + + public final $name $(f.name)($(f.type) value) { + this.$(f.name) = value; + this.has_$(f.name) = true; + this.dirty = true; + return this; + } + + static { + FIELDS.add(new Field<$name,$(jref(jclass(f.type)))>($name.class, $(jref(jclass(f.type))).class, "$(f.name)", $(f.index)) { + public boolean has(Object struct) { + return check(struct).has_$(f.name); + } + public void has(Object struct, boolean value) { + check(struct).has_$(f.name) = value; + } + public $(jref(f.type)) get(Object struct) { + return check(struct).$(f.get)(); + } + public void read(Decoder dec, Object struct) { + check(struct).$(f.name) = $(f.read); + check(struct).dirty = true; + } + public void write(Encoder enc, Object struct) { + $(f.write); + } + }); + } +""") +}} diff --git a/java/common/Constant.tpl b/java/common/Constant.tpl new file mode 100644 index 0000000000..695812ea75 --- /dev/null +++ b/java/common/Constant.tpl @@ -0,0 +1,14 @@ +package org.apache.qpidity.transport; + +${from genutil import *} + +public interface Constant +{ +${ +constants = spec.query["amqp/constant"] + +for c in constants: + name = scream(c["@name"]) + value = c["@value"] + out(" public static final int $name = $value;\n") +}} diff --git a/java/common/Enum.tpl b/java/common/Enum.tpl new file mode 100644 index 0000000000..337feb7065 --- /dev/null +++ b/java/common/Enum.tpl @@ -0,0 +1,36 @@ +package org.apache.qpidity.transport; + +public enum $name { +${ +from genutil import * + +vtype = jtype(resolve_type(type)) + +choices = [(scream(ch["@name"]), "(%s) %s" % (vtype, ch["@value"])) + for ch in type.query["enum/choice"]] +} + $(",\n ".join(["%s(%s)" % ch for ch in choices])); + + private final $vtype value; + + $name($vtype value) + { + this.value = value; + } + + public $vtype getValue() + { + return value; + } + + public static $name get($vtype value) + { + switch (value) + { +${ +for ch, value in choices: + out(' case $value: return $ch;\n') +} default: throw new IllegalArgumentException("no such value: " + value); + } + } +} diff --git a/java/common/Invoker.tpl b/java/common/Invoker.tpl new file mode 100644 index 0000000000..d9905c71a0 --- /dev/null +++ b/java/common/Invoker.tpl @@ -0,0 +1,41 @@ +package org.apache.qpidity.transport; + +import java.util.List; +import java.util.Map; +import java.util.UUID; + +public abstract class Invoker { + + protected abstract void invoke(Method method); + protected abstract <T> Future<T> invoke(Method method, Class<T> resultClass); + +${ +from genutil import * + +for c in composites: + name = cname(c) + fields = get_fields(c) + params = get_parameters(fields) + args = get_arguments(fields) + result = c["result"] + if result: + if not result["@type"]: + rname = cname(result["struct"]) + else: + rname = cname(result, "@type") + jresult = "Future<%s>" % rname + jreturn = "return " + jclass = ", %s.class" % rname + else: + jresult = "void" + jreturn = "" + jclass = "" + + out(""" + public $jresult $(dromedary(name))($(", ".join(params))) { + $(jreturn)invoke(new $name($(", ".join(args)))$jclass); + } +""") +} + +} diff --git a/java/common/MethodDelegate.tpl b/java/common/MethodDelegate.tpl new file mode 100644 index 0000000000..e5ab1ae1e7 --- /dev/null +++ b/java/common/MethodDelegate.tpl @@ -0,0 +1,12 @@ +package org.apache.qpidity.transport; + +public abstract class MethodDelegate<C> { + +${ +from genutil import * + +for c in composites: + name = cname(c) + out(" public void $(dromedary(name))(C context, $name struct) {}\n") +} +} diff --git a/java/common/Option.tpl b/java/common/Option.tpl new file mode 100644 index 0000000000..5fa2b95b9f --- /dev/null +++ b/java/common/Option.tpl @@ -0,0 +1,19 @@ +package org.apache.qpidity.transport; + +public enum Option { + +${ +from genutil import * + +options = {} + +for c in composites: + for f in c.query["field"]: + t = resolve_type(f) + if t["@name"] == "bit": + option = scream(f["@name"]) + if not options.has_key(option): + options[option] = None + out(" $option,\n")} + NO_OPTION +} diff --git a/java/common/StructFactory.tpl b/java/common/StructFactory.tpl new file mode 100644 index 0000000000..b27621b1d2 --- /dev/null +++ b/java/common/StructFactory.tpl @@ -0,0 +1,39 @@ +package org.apache.qpidity.transport; + +class StructFactory { + + public static Struct create(int type) + { + switch (type) + { +${ +from genutil import * + +fragment = """ case $name.TYPE: + return new $name(); +""" + +for c in composites: + name = cname(c) + if c.name == "struct": + out(fragment) +} default: + throw new IllegalArgumentException("type: " + type); + } + } + + public static Struct createInstruction(int type) + { + switch (type) + { +${ +for c in composites: + name = cname(c) + if c.name in ("command", "control"): + out(fragment) +} default: + throw new IllegalArgumentException("type: " + type); + } + } + +} diff --git a/java/common/Type.tpl b/java/common/Type.tpl new file mode 100644 index 0000000000..c869934538 --- /dev/null +++ b/java/common/Type.tpl @@ -0,0 +1,63 @@ +package org.apache.qpidity.transport; + +${from genutil import *} + +public enum Type +{ + +${ +types = spec.query["amqp/type"] + spec.query["amqp/class/type"] +codes = {} +first = True +for t in types: + code = t["@code"] + fix_width = t["@fixed-width"] + var_width = t["@variable-width"] + + if code is None: + continue + + if fix_width is None: + width = var_width + fixed = "false" + else: + width = fix_width + fixed = "true" + + name = scream(t["@name"]) + codes[code] = name + + if first: + first = False + else: + out(",\n") + + out(" $name((byte) $code, $width, $fixed)") +}; + + public byte code; + public int width; + public boolean fixed; + + Type(byte code, int width, boolean fixed) + { + this.code = code; + this.width = width; + this.fixed = fixed; + } + + public static Type get(byte code) + { + switch (code) + { +${ +keys = list(codes.keys()) +keys.sort() + +for code in keys: + out(" case (byte) $code: return $(codes[code]);\n") +} + default: return null; + } + } +} diff --git a/java/common/build.xml b/java/common/build.xml index 6c8bfbaf0f..796385eff3 100644 --- a/java/common/build.xml +++ b/java/common/build.xml @@ -34,7 +34,7 @@ <target name="check_jython_deps"> <uptodate property="jython.notRequired" targetfile="${jython.timestamp}"> - <srcfiles dir="${xml.spec.dir}" includes="amqp.0-10-preview.xml" /> + <srcfiles dir="${xml.spec.dir}" includes="amqp.0-10-qpid-errata.xml" /> </uptodate> </target> @@ -42,10 +42,9 @@ <java classname="org.python.util.jython" fork="true" failonerror="true"> <arg value="-Dpython.cachedir.skip=true"/> <arg value="-Dpython.path=${basedir}/jython-lib.jar/Lib${path.separator}${mllib.dir}"/> - <arg value="${basedir}/generate"/> + <arg value="${basedir}/codegen"/> <arg value="${module.precompiled}"/> - <arg value="org.apache.qpidity.transport"/> - <arg value="${xml.spec.dir}/amqp.0-10-preview.xml"/> + <arg value="${xml.spec.dir}/amqp.0-10-qpid-errata.xml"/> <classpath> <pathelement location="jython-2.2-rc2.jar"/> </classpath> diff --git a/java/common/codegen b/java/common/codegen new file mode 100755 index 0000000000..f5d1577774 --- /dev/null +++ b/java/common/codegen @@ -0,0 +1,63 @@ +#!/usr/bin/env python + +import os, sys, mllib +from templating import Parser +from genutil import * + +out_dir = sys.argv[1] +spec_file = sys.argv[2] +pkg_dir = os.path.join(out_dir, "org/apache/qpidity/transport") + +if not os.path.exists(pkg_dir): + os.makedirs(pkg_dir) + +spec = mllib.xml_parse(spec_file) + +def excludes(nd): + if (nd.parent is not None and + nd.parent.name == "class" and + nd.parent["@name"] in ("file", "stream")): + return False + else: + return True + +def execute(output, template, **kwargs): + f = open(template) + input = f.read() + f.close() + p = Parser(**kwargs) + p.parse(input) + fname = os.path.join(pkg_dir, output) + f = open(fname, "w") + f.write(p.output) + f.close() + +execute("Type.java", "Type.tpl", spec = spec) +execute("Constant.java", "Constant.tpl", spec = spec) + +structs = spec.query["amqp/struct"] + \ + spec.query["amqp/class/struct", excludes] + \ + spec.query["amqp/class/command/result/struct", excludes] +controls = spec.query["amqp/class/control", excludes] +commands = spec.query["amqp/class/command", excludes] + +composites = structs + controls + commands + +for c in composites: + name = cname(c) + execute("%s.java" % name, "Composite.tpl", type = c, name = name) + +execute("MethodDelegate.java", "MethodDelegate.tpl", composites = composites) +execute("Option.java", "Option.tpl", composites = composites) +execute("Invoker.java", "Invoker.tpl", composites = controls + commands) +execute("StructFactory.java", "StructFactory.tpl", composites = composites) + +def is_enum(nd): + return nd["enum"] is not None + +enums = spec.query["amqp/domain", is_enum] + \ + spec.query["amqp/class/domain", is_enum] + +for e in enums: + name = cname(e) + execute("%s.java" % name, "Enum.tpl", name = name, type = e) diff --git a/java/common/generate b/java/common/generate deleted file mode 100755 index daf8475c54..0000000000 --- a/java/common/generate +++ /dev/null @@ -1,567 +0,0 @@ -#!/usr/bin/python - -# Interim code generation script. - -import sys, os, mllib -from cStringIO import StringIO - -out_dir=sys.argv[1] -out_pkg = sys.argv[2] -spec_file = sys.argv[3] - -spec = mllib.xml_parse(spec_file) - -def jbool(b): - if b: - return "true" - else: - return "false" - -class Output: - - def __init__(self, dir, package, name): - self.dir = dir - self.package = package - self.name = name - self.lines = [] - - self.line("package %s;" % self.package) - self.line() - self.line("import java.util.ArrayList;") - self.line("import java.util.List;") - self.line("import java.util.Map;") - self.line("import java.util.UUID;") - self.line() - self.line("import org.apache.qpidity.transport.codec.Decoder;") - self.line("import org.apache.qpidity.transport.codec.Encodable;") - self.line("import org.apache.qpidity.transport.codec.Encoder;") - self.line() - self.line("import org.apache.qpidity.transport.network.Frame;") - self.line() - self.line() - - def line(self, l = ""): - self.lines.append(l) - - def getter(self, type, method, value, pre = None): - self.line() - self.line(" public final %s %s() {" % (type, method)) - if pre: - self.line(" %s;" % pre) - self.line(" return %s;" % value) - self.line(" }") - - def setter(self, type, method, variable, value = None, pre = None, - post = None): - if value: - params = "" - else: - params = "%s value" % type - value = "value" - - self.line() - self.line(" public final %s %s(%s) {" % (self.name, method, params)) - if pre: - self.line(" %s;" % pre) - self.line(" this.%s = %s;" % (variable, value)) - if post: - self.line(" %s;" % post) - self.line(" return this;") - self.line(" }") - - def write(self): - dir = os.path.join(self.dir, *self.package.split(".")) - if not os.path.exists(dir): - os.makedirs(dir) - file = os.path.join(dir, "%s.java" % self.name) - out = open(file, "w") - for l in self.lines: - out.write(l) - out.write(os.linesep) - out.close() - -TYPES = { - "longstr": "String", - "shortstr": "String", - "longlong": "long", - "long": "long", - "short": "int", - "octet": "short", - "bit": "boolean", - "table": "Map<String,Object>", - "timestamp": "long", - "content": "String", - "uuid": "UUID", - "rfc1982-long-set": "RangeSet", - "long-struct": "Struct", - "signed-byte": "byte", - "unsigned-byte": "short", - "char": "char", - "boolean": "boolean", - "two-octets": "short", - "signed-short": "short", - "unsigned-short": "int", - "four-octets": "int", - "signed-int": "int", - "unsigned-int": "long", - "float": "float", - "utf32-char": "char", - "eight-octets": "long", - "signed-long": "long", - "unsigned-long": "long", - "double": "double", - "datetime": "long", - "sixteen-octets": "byte[]", - "thirty-two-octets": "byte[]", - "sixty-four-octets": "byte[]", - "_128-octets": "byte[]", - "short-binary": "byte[]", - "short-string": "String", - "short-utf8-string": "String", - "short-utf16-string": "String", - "short-utf32-string": "String", - "binary": "byte[]", - "string": "String", - "utf8-string": "String", - "utf16-string": "String", - "utf32-string": "String", - "long-binary": "byte[]", - "long-string": "String", - "long-utf8-string": "String", - "long-utf16-string": "String", - "long-utf32-string": "String", - "sequence": "List<Object>", - "array": "List<Object>", - "five-octets": "byte[]", - "decimal": "byte[]", - "nine-octets": "byte[]", - "long-decimal": "byte[]", - "void": "Void" - } - -DEFAULTS = { - "longlong": "0", - "long": "0", - "short": "0", - "octet": "0", - "timestamp": "0", - "bit": "false" - } - -TRACKS = { - "connection": "Frame.L1", - "session": "Frame.L2", - "execution": "Frame.L3", - None: None - } - -def camel(offset, *args): - parts = [] - for a in args: - parts.extend(a.split("-")) - return "".join(parts[:offset] + [p[0].upper() + p[1:] for p in parts[offset:]]) - -def dromedary(s): - return s[0].lower() + s[1:] - -def scream(*args): - return "_".join([a.replace("-", "_").upper() for a in args]) - - -types = Output(out_dir, out_pkg, "Type") -types.line("public enum Type") -types.line("{") -codes = {} -for c in spec.query["amqp/constant"]: - if c["@class"] == "field-table-type": - name = c["@name"] - if name.startswith("field-table-"): - name = name[12:] - if name[0].isdigit(): - name = "_" + name - val = c["@value"] - codes[val] = name - if c["@width"] != None: - width = c["@width"] - fixed = "true" - if c["@lfwidth"] != None: - width = c["@lfwidth"] - fixed = "false" - types.line(" %s((byte) %s, %s, %s)," % - (scream(name), val, width, fixed)) -types.line(" ;") - -types.line(" public byte code;") -types.line(" public int width;") -types.line(" public boolean fixed;") - -types.line(" Type(byte code, int width, boolean fixed)") -types.line(" {") -for arg in ("code", "width", "fixed"): - types.line(" this.%s = %s;" % (arg, arg)) -types.line(" }") - -types.line(" public static Type get(byte code)") -types.line(" {") -types.line(" switch (code)") -types.line(" {") -for code, name in codes.items(): - types.line(" case (byte) %s: return %s;" % (code, scream(name))) -types.line(" default: return null;") -types.line(" }") -types.line(" }") - -types.line("}") -types.write() - - -const = Output(out_dir, out_pkg, "Constant") -const.line("public interface Constant") -const.line("{") -for d in spec.query["amqp/constant"]: - name = d["@name"] - val = d["@value"] - datatype = d["@datatype"] - if datatype == None: - const.line("public static final int %s = %s;" % (scream(name), val)) -const.line("}") -const.write() - - -DOMAINS = {} -STRUCTS = {} - -for d in spec.query["amqp/domain"]: - name = d["@name"] - type = d["@type"] - if type != None: - DOMAINS[name] = d["@type"] - elif d["struct"] != None: - DOMAINS[name] = name - STRUCTS[name] = camel(0, name) - -def resolve(type): - if DOMAINS.has_key(type) and DOMAINS[type] != type: - return resolve(DOMAINS[type]) - else: - return type - -def jtype(type): - if STRUCTS.has_key(type): - return STRUCTS[type] - else: - return TYPES[type] - -def jclass(jt): - idx = jt.find('<') - if idx > 0: - return jt[:idx] - else: - return jt - -REFS = { - "boolean": "Boolean", - "byte": "Byte", - "short": "Short", - "int": "Integer", - "long": "Long", - "float": "Float", - "double": "Double", - "char": "Character" -} - -def jref(jt): - return REFS.get(jt, jt) - - -OPTIONS = {} - -class Struct: - - def __init__(self, node, name, base, type, size, pack, track, content): - self.node = node - self.name = name - self.base = base - self.type = type - self.size = size - self.pack = pack - self.track = track - self.content = content - self.fields = [] - self.ticket = False - - def result(self): - r = self.node["result"] - if not r: return - name = r["@domain"] - if not name: - name = self.name + "Result" - else: - name = camel(0, name) - return name - - def field(self, type, name): - if name == "ticket": - self.ticket = True - else: - self.fields.append((type, name)) - - def impl(self, out): - out.line("public class %s extends %s {" % (self.name, self.base)) - - out.line() - out.line(" public static final int TYPE = %d;" % self.type) - out.getter("int", "getStructType", "TYPE") - out.getter("int", "getSizeWidth", self.size) - out.getter("int", "getPackWidth", self.pack) - out.getter("boolean", "hasTicket", jbool(self.ticket)) - - if self.base == "Method": - out.getter("boolean", "hasPayload", jbool(self.content)) - out.getter("byte", "getEncodedTrack", self.track) - - out.line() - out.line(" private static final List<Field<?,?>> FIELDS = new ArrayList<Field<?,?>>();") - out.line(" public List<Field<?,?>> getFields() { return FIELDS; }") - out.line() - - out.line() - for type, name in self.fields: - out.line(" private boolean has_%s;" % name) - out.line(" private %s %s;" % (jtype(type), name)) - - if self.fields: - out.line() - out.line(" public %s() {}" % self.name) - - out.line() - out.line(" public %s(%s) {" % (self.name, self.parameters())) - opts = False - for type, name in self.fields: - if not OPTIONS.has_key(name): - out.line(" %s(%s);" % (camel(1, "set", name), name)) - else: - opts = True - if opts: - for type, name in self.fields: - if OPTIONS.has_key(name): - out.line(" boolean _%s = false;" % name) - out.line(" for (int i=0; i < _options.length; i++) {") - out.line(" switch (_options[i]) {") - for type, name in self.fields: - if OPTIONS.has_key(name): - out.line(" case %s: _%s=true; break;" % (OPTIONS[name], name)) - out.line(" case NO_OPTION: break;") - out.line(' default: throw new IllegalArgumentException' - '("invalid option: " + _options[i]);') - out.line(" }") - out.line(" }") - for type, name in self.fields: - if OPTIONS.has_key(name): - out.line(" %s(_%s);" % (camel(1, "set", name), name)) - out.line(" }") - - out.line() - out.line(" public <C> void dispatch(C context, MethodDelegate<C> delegate) {") - out.line(" delegate.%s(context, this);" % dromedary(self.name)) - out.line(" }") - - index = 0 - for type, name in self.fields: - out.getter("boolean", camel(1, "has", name), "has_" + name) - out.setter("boolean", camel(1, "clear", name), "has_" + name, "false", - post = "this.%s = %s; this.dirty = true" % (name, DEFAULTS.get(type, "null"))) - out.getter(jtype(type), camel(1, "get", name), name) - for mname in (camel(1, "set", name), name): - out.setter(jtype(type), mname, name, - post = "this.has_%s = true; this.dirty = true" % name) - - out.line() - out.line(' static {') - ftype = jref(jclass(jtype(type))) - out.line(' FIELDS.add(new Field<%s,%s>(%s.class, %s.class, "%s", %d) {' % - (self.name, ftype, self.name, ftype, name, index)) - out.line(' public boolean has(Object struct) {') - out.line(' return check(struct).has_%s;' % name) - out.line(' }') - out.line(' public void has(Object struct, boolean value) {') - out.line(' check(struct).has_%s = value;' % name) - out.line(' }') - out.line(' public %s get(Object struct) {' % ftype) - out.line(' return check(struct).%s();' % camel(1, "get", name)) - out.line(' }') - out.line(' public void read(Decoder dec, Object struct) {') - if TYPES.has_key(type): - out.line(' check(struct).%s = dec.read%s();' % (name, camel(0, type))) - elif STRUCTS.has_key(type): - out.line(' check(struct).%s = (%s) dec.readStruct(%s.TYPE);' % - (name, STRUCTS[type], STRUCTS[type])) - else: - raise Exception("unknown type: %s" % type) - out.line(' check(struct).dirty = true;') - out.line(' }') - out.line(' public void write(Encoder enc, Object struct) {') - if TYPES.has_key(type): - out.line(' enc.write%s(check(struct).%s);' % (camel(0, type), name)) - elif STRUCTS.has_key(type): - out.line(' enc.writeStruct(%s.TYPE, check(struct).%s);' % - (STRUCTS[type], name)) - else: - raise Exception("unknown type: %s" % type) - out.line(' }') - out.line(' });') - out.line(' }') - index += 1; - - out.line("}") - - - def parameters(self): - params = [] - var = False - for type, name in self.fields: - if OPTIONS.has_key(name): - var = True - else: - params.append("%s %s" % (jtype(type), name)) - if var: - params.append("Option ... _options") - return ", ".join(params) - - def arguments(self): - args = [] - var = False - for type, name in self.fields: - if OPTIONS.has_key(name): - var = True - else: - args.append(name) - if var: - args.append("_options") - return ", ".join(args) - -CLASSES = {"file": False, "basic": False, "stream": False, "tunnel": False} - -PACK_WIDTHS = { - None: 2, - "octet": 1, - "short": 2, - "long": 4 - } - -SIZE_WIDTHS = PACK_WIDTHS.copy() -SIZE_WIDTHS[None] = 0 - -class Visitor(mllib.transforms.Visitor): - - def __init__(self): - self.structs = [] - self.untyped = -1 - - def do_method(self, m): - if CLASSES.get(m.parent["@name"], True): - name = camel(0, m.parent["@name"], m["@name"]) - type = int(m.parent["@index"])*256 + int(m["@index"]) - self.structs.append((name, "Method", type, 0, 2, m)) - self.descend(m) - - def do_domain(self, d): - s = d["struct"] - if s: - name = camel(0, d["@name"]) - st = s["@type"] - if st in (None, "none", ""): - type = self.untyped - self.untyped -= 1 - else: - type = int(st) - self.structs.append((name, "Struct", type, SIZE_WIDTHS[s["@size"]], - PACK_WIDTHS[s["@pack"]], s)) - self.descend(d) - - def do_result(self, r): - s = r["struct"] - if s: - name = camel(0, r.parent.parent["@name"], r.parent["@name"], "Result") - type = int(r.parent.parent["@index"]) * 256 + int(s["@type"]) - self.structs.append((name, "Result", type, SIZE_WIDTHS[s["@size"]], - PACK_WIDTHS[s["@pack"]], s)) - self.descend(r) - -v = Visitor() -spec.dispatch(v) - -opts = Output(out_dir, out_pkg, "Option") -opts.line("public enum Option {") -structs = [] -for name, base, typecode, size, pack, m in v.structs: - struct = Struct(m, name, base, typecode, size, pack, - TRACKS.get(m.parent["@name"], "Frame.L4"), - m["@content"] == "1") - for f in m.query["field"]: - type = resolve(f["@domain"]) - name = camel(1, f["@name"]) - struct.field(type, name) - if type == "bit": - opt_name = scream(f["@name"]) - if not OPTIONS.has_key(name): - OPTIONS[name] = opt_name - opts.line(" %s," % opt_name) - structs.append(struct) -opts.line(" %s," % "NO_OPTION") -opts.line("}") -opts.write() - - - - -for s in structs: - impl = Output(out_dir, out_pkg, s.name) - s.impl(impl) - impl.write() - -fct = Output(out_dir, out_pkg, "StructFactory") -fct.line("class StructFactory {") -fct.line(" public static Struct create(int type) {") -fct.line(" switch (type) {") -for s in structs: - fct.line(" case %s.TYPE:" % s.name) - fct.line(" return new %s();" % s.name) -fct.line(" default:") -fct.line(' throw new IllegalArgumentException("type: " + type);') -fct.line(" }") -fct.line(" }") -fct.line("}"); -fct.write() - -dlg = Output(out_dir, out_pkg, "MethodDelegate") -dlg.line("public abstract class MethodDelegate<C> {") -for s in structs: - dlg.line(" public void %s(C context, %s struct) {}" % - (dromedary(s.name), s.name)) -dlg.line("}") -dlg.write() - -inv = Output(out_dir, out_pkg, "Invoker") -inv.line("public abstract class Invoker {") -inv.line() -inv.line(" protected abstract void invoke(Method method);") -inv.line(" protected abstract <T> Future<T> invoke(Method method, Class<T> resultClass);") -inv.line() -for s in structs: - if s.base != "Method": continue - dname = dromedary(s.name) - result = s.result() - if result: - result_type = "Future<%s>" % result - else: - result_type = "void" - inv.line(" public %s %s(%s) {" % (result_type, dname, s.parameters())) - if result: - inv.line(" return invoke(new %s(%s), %s.class);" % - (s.name, s.arguments(), result)) - else: - inv.line(" invoke(new %s(%s));" % (s.name, s.arguments())) - inv.line(" }") -inv.line("}") -inv.write() diff --git a/java/common/genutil.py b/java/common/genutil.py new file mode 100644 index 0000000000..5206b50bbd --- /dev/null +++ b/java/common/genutil.py @@ -0,0 +1,207 @@ + +def camel(offset, *args): + parts = [] + for a in args: + parts.extend(a.split("-")) + return "".join(parts[:offset] + [p[0].upper() + p[1:] for p in parts[offset:]]) + +def dromedary(s): + return s[0].lower() + s[1:] + +def scream(*args): + return "_".join([a.replace("-", "_").upper() for a in args]) + +def num(x): + if x is not None and x != "": + return int(x, 0) + else: + return None + +def klass(nd): + parent = nd.parent + while parent is not None: + if hasattr(parent, "name") and parent.name == "class": + return parent + parent = parent.parent + +untyped = -1 + +def code(nd): + global untyped + cd = num(nd["@code"]) + if cd is None: + cd = untyped + untyped -= 1 + return cd + + cls = klass(nd) + if cls: + cd |= (num(cls["@code"]) << 8) + return cd + +def root(nd): + if nd.parent is None: + return nd + else: + return root(nd.parent) + +def qname(nd): + name = nd["@name"] + cls = klass(nd) + if cls != None: + return "%s.%s" % (cls["@name"], name) + else: + return name + +def resolve(node, name): + spec = root(node) + cls = klass(node) + if cls: + for nd in cls.query["#tag"]: + if nd["@name"] == name: + return nd + for nd in spec.query["amqp/#tag"] + spec.query["amqp/class/#tag"]: + if name == qname(nd): + return nd + raise Exception("unresolved name: %s" % name) + +def resolve_type(nd): + name = nd["@type"] + type = resolve(nd, name) + if type.name == "domain" and not type["enum"]: + return resolve_type(type) + else: + return type + +TYPES = { + "bit": "boolean", + "uint8": "short", + "uint16": "int", + "uint32": "long", + "uint64": "long", + "datetime": "long", + "uuid": "UUID", + "sequence-no": "int", + "sequence-set": "RangeSet", # XXX + "byte-ranges": "RangeSet", # XXX + "str8": "String", + "str16": "String", + "vbin8": "byte[]", + "vbin16": "byte[]", + "vbin32": "byte[]", + "struct32": "Struct", + "map": "Map<String,Object>", + "array": "List<Object>" + } + +def cname(nd, field="@name"): + cls = klass(nd) + if cls: + if (nd.name in ("struct", "result") and + cls["@name"] != "session" and + nd[field] != "header"): + return camel(0, nd[field]) + else: + return camel(0, cls["@name"], nd[field]) + else: + return camel(0, nd[field]) + +def jtype(nd): + if nd.name == "struct" or nd["enum"]: + return cname(nd) + else: + return TYPES[nd["@name"]] + +REFS = { + "boolean": "Boolean", + "byte": "Byte", + "short": "Short", + "int": "Integer", + "long": "Long", + "float": "Float", + "double": "Double", + "char": "Character" +} + +def jref(jt): + return REFS.get(jt, jt) + +def jclass(jt): + idx = jt.find('<') + if idx > 0: + return jt[:idx] + else: + return jt + +DEFAULTS = { + "long": 0, + "int": 0, + "short": 0, + "byte": 0, + "char": 0, + "boolean": "false" + } + +class Field: + + def __init__(self, index, nd): + self.index = index + self.name = camel(1, nd["@name"]) + type_node = resolve_type(nd) + tname = cname(type_node) + if type_node.name == "struct": + self.read = "(%s) dec.readStruct(%s.TYPE)" % (tname, tname) + self.write = "enc.writeStruct(%s.TYPE, check(struct).%s)" % (tname, self.name) + elif type_node.name == "domain": + coder = camel(0, resolve_type(type_node)["@name"]) + self.read = "%s.get(dec.read%s())" % (tname, coder) + self.write = "enc.write%s(check(struct).%s.getValue())" % (coder, self.name) + else: + coder = camel(0, type_node["@name"]) + self.read = "dec.read%s()" % coder + self.write = "enc.write%s(check(struct).%s)" % (coder, self.name) + self.type = jtype(type_node) + self.default = DEFAULTS.get(self.type, "null") + self.has = camel(1, "has", self.name) + self.get = camel(1, "get", self.name) + self.set = camel(1, "set", self.name) + self.clear = camel(1, "clear", self.name) + if self.type == "boolean": + self.option = scream(nd["@name"]) + else: + self.option = None + +def get_fields(nd): + fields = [] + index = 0 + for f in nd.query["field"]: + fields.append(Field(index, f)) + index += 1 + return fields + +def get_parameters(fields): + params = [] + options = False + for f in fields: + if f.option: + options = True + else: + params.append("%s %s" % (f.type, f.name)) + if options: + params.append("Option ... _options") + return params + +def get_arguments(fields): + args = [] + options = False + for f in fields: + if f.option: + options = True + else: + args.append(f.name) + if options: + args.append("_options") + return args + +def get_options(fields): + return [f for f in fields if f.option] diff --git a/java/common/src/main/java/org/apache/qpidity/SecurityHelper.java b/java/common/src/main/java/org/apache/qpidity/SecurityHelper.java index d0b3272dec..a72997813a 100644 --- a/java/common/src/main/java/org/apache/qpidity/SecurityHelper.java +++ b/java/common/src/main/java/org/apache/qpidity/SecurityHelper.java @@ -22,6 +22,7 @@ package org.apache.qpidity; import java.io.UnsupportedEncodingException; import java.util.HashSet; +import java.util.List; import java.util.StringTokenizer; import org.apache.qpidity.security.AMQPCallbackHandler; @@ -29,13 +30,12 @@ import org.apache.qpidity.security.CallbackHandlerRegistry; public class SecurityHelper { - public static String chooseMechanism(String mechanisms) throws UnsupportedEncodingException + public static String chooseMechanism(List<Object> mechanisms) throws UnsupportedEncodingException { - StringTokenizer tokenizer = new StringTokenizer(mechanisms, " "); HashSet mechanismSet = new HashSet(); - while (tokenizer.hasMoreTokens()) + for (Object m : mechanisms) { - mechanismSet.add(tokenizer.nextToken()); + mechanismSet.add(m); } String preferredMechanisms = CallbackHandlerRegistry.getInstance().getMechanisms(); diff --git a/java/common/src/main/java/org/apache/qpidity/ToyBroker.java b/java/common/src/main/java/org/apache/qpidity/ToyBroker.java index 2bd97f3aff..5f9917e30a 100644 --- a/java/common/src/main/java/org/apache/qpidity/ToyBroker.java +++ b/java/common/src/main/java/org/apache/qpidity/ToyBroker.java @@ -59,7 +59,7 @@ class ToyBroker extends SessionDelegate public void messageAcquire(Session context, MessageAcquire struct) { System.out.println("\n==================> messageAcquire " ); - context.messageAcquired(struct.getTransfers()); + context.executionResult((int) struct.getId(), new Acquired(struct.getTransfers())); } @Override public void queueDeclare(Session ssn, QueueDeclare qd) @@ -68,16 +68,16 @@ class ToyBroker extends SessionDelegate System.out.println("\n==================> declared queue: " + qd.getQueue() + "\n"); } - @Override public void queueBind(Session ssn, QueueBind qb) + @Override public void exchangeBind(Session ssn, ExchangeBind qb) { - exchange.bindQueue(qb.getExchange(), qb.getRoutingKey(),qb.getQueue()); - System.out.println("\n==================> bound queue: " + qb.getQueue() + " with routing key " + qb.getRoutingKey() + "\n"); + exchange.bindQueue(qb.getExchange(), qb.getBindingKey(),qb.getQueue()); + System.out.println("\n==================> bound queue: " + qb.getQueue() + " with binding key " + qb.getBindingKey() + "\n"); } @Override public void queueQuery(Session ssn, QueueQuery qq) { QueueQueryResult result = new QueueQueryResult().queue(qq.getQueue()); - ssn.executionResult(qq.getId(), result); + ssn.executionResult((int) qq.getId(), result); } @Override public void messageSubscribe(Session ssn, MessageSubscribe ms) @@ -112,7 +112,8 @@ class ToyBroker extends SessionDelegate { if (xfr == null || body == null) { - ssn.connectionClose(503, "no method segment", 0, 0); + ssn.connectionClose(ConnectionCloseCode.FRAMING_ERROR, + "no method segment"); ssn.close(); return; } @@ -136,7 +137,7 @@ class ToyBroker extends SessionDelegate { if (xfr == null || body == null) { - ssn.connectionClose(503, "no method segment", 0, 0); + ssn.connectionClose(ConnectionCloseCode.FRAMING_ERROR, "no method segment"); ssn.close(); return; } @@ -174,14 +175,16 @@ class ToyBroker extends SessionDelegate { RangeSet ranges = new RangeSet(); ranges.add(xfr.getId()); - ssn.messageReject(ranges, 0, "no such destination"); + ssn.messageReject(ranges, MessageRejectCode.UNROUTABLE, + "no such destination"); } } private void transferMessageToPeer(Session ssn,String dest, Message m) { System.out.println("\n==================> Transfering message to: " +dest + "\n"); - ssn.messageTransfer(dest, (short)0, (short)0); + ssn.messageTransfer(dest, MessageAcceptMode.EXPLICIT, + MessageAcquireMode.PRE_ACQUIRED); ssn.header(m.header); for (Data d : m.body) { diff --git a/java/common/src/main/java/org/apache/qpidity/ToyClient.java b/java/common/src/main/java/org/apache/qpidity/ToyClient.java index 10b68bbb20..a3233afcbe 100644 --- a/java/common/src/main/java/org/apache/qpidity/ToyClient.java +++ b/java/common/src/main/java/org/apache/qpidity/ToyClient.java @@ -63,7 +63,7 @@ class ToyClient extends SessionDelegate public static final void main(String[] args) { Connection conn = MinaHandler.connect("0.0.0.0", 5672, - new ConnectionDelegate() + new ClientDelegate() { public SessionDelegate getSessionDelegate() { @@ -80,9 +80,9 @@ class ToyClient extends SessionDelegate TransportConstants.getVersionMinor()))); Channel ch = conn.getChannel(0); - Session ssn = new Session(); + Session ssn = new Session("my-session".getBytes()); ssn.attach(ch); - ssn.sessionOpen(1234); + ssn.sessionAttach(ssn.getName()); ssn.queueDeclare("asdf", null, null); ssn.sync(); @@ -111,13 +111,15 @@ class ToyClient extends SessionDelegate map.put("list", Arrays.asList(1, 2, 3)); map.put("binary", new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}); - ssn.messageTransfer("asdf", (short) 0, (short) 1); + ssn.messageTransfer("asdf", MessageAcceptMode.EXPLICIT, + MessageAcquireMode.PRE_ACQUIRED); ssn.header(new DeliveryProperties(), new MessageProperties().setApplicationHeaders(map)); ssn.data("this is the data"); ssn.endData(); - ssn.messageTransfer("fdsa", (short) 0, (short) 1); + ssn.messageTransfer("fdsa", MessageAcceptMode.EXPLICIT, + MessageAcquireMode.PRE_ACQUIRED); ssn.data("this should be rejected"); ssn.endData(); ssn.sync(); diff --git a/java/common/src/main/java/org/apache/qpidity/dtx/XidImpl.java b/java/common/src/main/java/org/apache/qpidity/dtx/XidImpl.java index 224917ade1..89d7e7917f 100644 --- a/java/common/src/main/java/org/apache/qpidity/dtx/XidImpl.java +++ b/java/common/src/main/java/org/apache/qpidity/dtx/XidImpl.java @@ -241,28 +241,10 @@ public class XidImpl implements Xid * @return The String representation of this Xid * @throws QpidException In case of problem when converting this Xid into a string. */ - public static String convertToString(Xid xid) throws QpidException + public static org.apache.qpidity.transport.Xid convert(Xid xid) throws QpidException { - if (_logger.isDebugEnabled()) - { - _logger.debug("converting " + xid + " into a String"); - } - try - { - ByteArrayOutputStream res = new ByteArrayOutputStream(); - DataOutputStream out = new DataOutputStream(res); - out.writeLong(xid.getFormatId()); - byte[] txId = xid.getGlobalTransactionId(); - byte[] brId = xid.getBranchQualifier(); - out.writeByte(txId.length); - out.writeByte(brId.length); - out.write(txId); - out.write(brId); - return res.toString(); - } - catch (IOException e) - { - throw new QpidException("cannot convert the xid " + xid + " into a String", null, e); - } + return new org.apache.qpidity.transport.Xid(xid.getFormatId(), + xid.getGlobalTransactionId(), + xid.getBranchQualifier()); } } diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Channel.java b/java/common/src/main/java/org/apache/qpidity/transport/Channel.java index 7327697088..651402bcb7 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/Channel.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/Channel.java @@ -77,7 +77,7 @@ public class Channel extends Invoker connection.getConnectionDelegate().init(this, hdr); } - public void method(Void v, Method method) + public void control(Void v, Method method) { switch (method.getEncodedTrack()) { @@ -90,15 +90,17 @@ public class Channel extends Invoker case L3: method.delegate(session, sessionDelegate); break; - case L4: - method.delegate(session, sessionDelegate); - break; default: throw new IllegalStateException ("unknown track: " + method.getEncodedTrack()); } } + public void command(Void v, Method method) + { + method.delegate(session, sessionDelegate); + } + public void header(Void v, Header header) { header.delegate(session, sessionDelegate); diff --git a/java/common/src/main/java/org/apache/qpidity/transport/ChannelDelegate.java b/java/common/src/main/java/org/apache/qpidity/transport/ChannelDelegate.java index 84621fbe25..7f4365f515 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/ChannelDelegate.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/ChannelDelegate.java @@ -32,15 +32,14 @@ import java.util.UUID; class ChannelDelegate extends MethodDelegate<Channel> { - public @Override void sessionOpen(Channel channel, SessionOpen open) + public @Override void sessionAttach(Channel channel, SessionAttach atch) { - Session ssn = new Session(); + Session ssn = new Session(atch.getName()); ssn.attach(channel); - long lifetime = open.getDetachedLifetime(); - ssn.sessionAttached(UUID.randomUUID(), lifetime); + ssn.sessionAttached(ssn.getName()); } - public @Override void sessionClosed(Channel channel, SessionClosed closed) + public @Override void sessionDetached(Channel channel, SessionDetached closed) { channel.getSession().closed(); // XXX: should we remove the channel from the connection? It diff --git a/java/common/src/main/java/org/apache/qpidity/transport/ClientDelegate.java b/java/common/src/main/java/org/apache/qpidity/transport/ClientDelegate.java new file mode 100644 index 0000000000..699854fb3b --- /dev/null +++ b/java/common/src/main/java/org/apache/qpidity/transport/ClientDelegate.java @@ -0,0 +1,41 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity.transport; + + +/** + * ClientDelegate + * + */ + +public abstract class ClientDelegate extends ConnectionDelegate +{ + + public void init(Channel ch, ProtocolHeader hdr) + { + if (hdr.getMajor() != TransportConstants.getVersionMajor() && + hdr.getMinor() != TransportConstants.getVersionMinor()) + { + throw new RuntimeException("version missmatch: " + hdr); + } + } + +} diff --git a/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java b/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java index 4815f1025f..cb5f05a185 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java @@ -26,7 +26,10 @@ import org.apache.qpidity.SecurityHelper; import org.apache.qpidity.QpidException; import java.io.UnsupportedEncodingException; + +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; @@ -79,17 +82,27 @@ public abstract class ConnectionDelegate extends MethodDelegate<Channel> public void init(Channel ch, ProtocolHeader hdr) { - // XXX: hardcoded version - if (hdr.getMajor() != 0 && hdr.getMinor() != 10) + ch.getConnection().send(new ConnectionEvent(0, new ProtocolHeader + (1, + TransportConstants.getVersionMajor(), + TransportConstants.getVersionMinor()))); + if (hdr.getMajor() != TransportConstants.getVersionMajor() && + hdr.getMinor() != TransportConstants.getVersionMinor()) { // XXX - ch.getConnection().send(new ConnectionEvent(0, new ProtocolHeader(1, TransportConstants.getVersionMajor(), - TransportConstants.getVersionMinor()))); + ch.getConnection().send(new ConnectionEvent(0, new ProtocolHeader + (1, + TransportConstants.getVersionMajor(), + TransportConstants.getVersionMinor()))); ch.getConnection().close(); } else { - ch.connectionStart(hdr.getMajor(), hdr.getMinor(), null, "PLAIN", "utf8"); + List<Object> plain = new ArrayList<Object>(); + plain.add("PLAIN"); + List<Object> utf8 = new ArrayList<Object>(); + utf8.add("utf8"); + ch.connectionStart(null, plain, utf8); } } @@ -99,13 +112,13 @@ public abstract class ConnectionDelegate extends MethodDelegate<Channel> @Override public void connectionStart(Channel context, ConnectionStart struct) { String mechanism = null; - String response = null; + byte[] response = null; try { mechanism = SecurityHelper.chooseMechanism(struct.getMechanisms()); saslClient = Sasl.createSaslClient(new String[]{ mechanism },null, "AMQP", "localhost", null, SecurityHelper.createCallbackHandler(mechanism,_username,_password )); - response = new String(saslClient.evaluateChallenge(new byte[0]),_locale); + response = saslClient.evaluateChallenge(new byte[0]); } catch (UnsupportedEncodingException e) { @@ -128,13 +141,9 @@ public abstract class ConnectionDelegate extends MethodDelegate<Channel> { try { - String response = new String(saslClient.evaluateChallenge(struct.getChallenge().getBytes()),_locale); + byte[] response = saslClient.evaluateChallenge(struct.getChallenge()); context.connectionSecureOk(response); } - catch (UnsupportedEncodingException e) - { - // need error handling - } catch (SaslException e) { // need error handling @@ -144,14 +153,14 @@ public abstract class ConnectionDelegate extends MethodDelegate<Channel> @Override public void connectionTune(Channel context, ConnectionTune struct) { // should update the channel max given by the broker. - context.connectionTuneOk(struct.getChannelMax(), struct.getFrameMax(), struct.getHeartbeat()); + context.connectionTuneOk(struct.getChannelMax(), struct.getMaxFrameSize(), struct.getHeartbeatMax()); context.connectionOpen(_virtualHost, null, Option.INSIST); } @Override public void connectionOpenOk(Channel context, ConnectionOpenOk struct) { - String knownHosts = struct.getKnownHosts(); + List<Object> knownHosts = struct.getKnownHosts(); if(_negotiationCompleteLock != null) { _negotiationCompleteLock.lock(); @@ -187,13 +196,13 @@ public abstract class ConnectionDelegate extends MethodDelegate<Channel> byte[] challenge = null; if ( challenge == null) { - context.connectionTune(Integer.MAX_VALUE,maxFrame, 0); + context.connectionTune(Integer.MAX_VALUE, maxFrame, 0, Integer.MAX_VALUE); } else { try { - context.connectionSecure(new String(challenge,_locale)); + context.connectionSecure(challenge); } catch(Exception e) { @@ -218,16 +227,16 @@ public abstract class ConnectionDelegate extends MethodDelegate<Channel> try { saslServer = Sasl.createSaslServer(_mechanism, "AMQP", "ABC",new HashMap(),SecurityHelper.createCallbackHandler(_mechanism,_username,_password)); - byte[] challenge = saslServer.evaluateResponse(struct.getResponse().getBytes()); + byte[] challenge = saslServer.evaluateResponse(struct.getResponse()); if ( challenge == null) { - context.connectionTune(Integer.MAX_VALUE,maxFrame, 0); + context.connectionTune(Integer.MAX_VALUE, maxFrame, 0, Integer.MAX_VALUE); } else { try { - context.connectionSecure(new String(challenge,_locale)); + context.connectionSecure(challenge); } catch(Exception e) { @@ -250,8 +259,9 @@ public abstract class ConnectionDelegate extends MethodDelegate<Channel> @Override public void connectionOpen(Channel context, ConnectionOpen struct) { - String hosts = "amqp:1223243232325"; - context.connectionOpenOk(hosts); + List<Object> hosts = new ArrayList<Object>(); + hosts.add("amqp:1223243232325"); + context.connectionOpenOk(hosts); } public String getPassword() diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Data.java b/java/common/src/main/java/org/apache/qpidity/transport/Data.java index b9b8636d18..72c1c3331d 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/Data.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/Data.java @@ -93,7 +93,7 @@ public class Data implements ProtocolEvent { str.append(" | "); } - str.append(str(buf, 20)); + str.append(str(buf)); } str.append(")"); return str.toString(); diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Method.java b/java/common/src/main/java/org/apache/qpidity/transport/Method.java index 7304260333..bc4ec6289d 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/Method.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/Method.java @@ -20,6 +20,7 @@ */ package org.apache.qpidity.transport; +import org.apache.qpidity.transport.network.Frame; /** * Method @@ -34,11 +35,12 @@ public abstract class Method extends Struct implements ProtocolEvent { // XXX: should generate separate factories for separate // namespaces - return (Method) Struct.create(type); + return (Method) StructFactory.createInstruction(type); } // XXX: command subclass? private long id; + private boolean sync = false; public final long getId() { @@ -50,6 +52,16 @@ public abstract class Method extends Struct implements ProtocolEvent this.id = id; } + public final boolean isSync() + { + return sync; + } + + void setSync(boolean value) + { + this.sync = value; + } + public abstract boolean hasPayload(); public abstract byte getEncodedTrack(); @@ -58,7 +70,14 @@ public abstract class Method extends Struct implements ProtocolEvent public <C> void delegate(C context, ProtocolDelegate<C> delegate) { - delegate.method(context, this); + if (getEncodedTrack() == Frame.L4) + { + delegate.command(context, this); + } + else + { + delegate.control(context, this); + } } } diff --git a/java/common/src/main/java/org/apache/qpidity/transport/ProtocolDelegate.java b/java/common/src/main/java/org/apache/qpidity/transport/ProtocolDelegate.java index 6149c3876a..028d570416 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/ProtocolDelegate.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/ProtocolDelegate.java @@ -31,7 +31,9 @@ public interface ProtocolDelegate<C> void init(C context, ProtocolHeader header); - void method(C context, Method method); + void control(C context, Method control); + + void command(C context, Method command); void header(C context, Header header); diff --git a/java/common/src/main/java/org/apache/qpidity/transport/RangeSet.java b/java/common/src/main/java/org/apache/qpidity/transport/RangeSet.java index 97d4be0d2e..ccbcfa99d0 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/RangeSet.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/RangeSet.java @@ -98,6 +98,13 @@ public class RangeSet implements Iterable<Range> ranges.clear(); } + public RangeSet copy() + { + RangeSet copy = new RangeSet(); + copy.ranges.addAll(ranges); + return copy; + } + public String toString() { StringBuffer str = new StringBuffer(); diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Session.java b/java/common/src/main/java/org/apache/qpidity/transport/Session.java index e4f4af95a5..f6f4062c6d 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/Session.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/Session.java @@ -32,6 +32,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; +import static org.apache.qpidity.transport.Option.*; /** * Session @@ -56,23 +57,34 @@ public class Session extends Invoker private static boolean ENABLE_REPLAY = false; private static final Logger log = Logger.get(Session.class); + private byte[] name; + private long timeout = 60000; + // channel may be null Channel channel; // incoming command count - private long commandsIn = 0; + long commandsIn = 0; // completed incoming commands private final RangeSet processed = new RangeSet(); - private long processedMark = -1; private Range syncPoint = null; // outgoing command count private long commandsOut = 0; private Map<Long,Method> commands = new HashMap<Long,Method>(); - private long mark = 0; + private long maxComplete = -1; private AtomicBoolean closed = new AtomicBoolean(false); + public Session(byte[] name) + { + this.name = name; + } + + public byte[] getName() + { + return name; + } public Map<Long,Method> getOutstandingCommands() { @@ -133,25 +145,12 @@ public class Session extends Invoker public void flushProcessed() { - boolean first = true; - RangeSet rest = new RangeSet(); + RangeSet copy; synchronized (processed) { - for (Range r: processed) - { - if (first && r.includes(processedMark)) - { - processedMark = r.getUpper(); - } - else - { - rest.add(r); - } - - first = false; - } + copy = processed.copy(); } - executionComplete(processedMark, rest); + sessionCompleted(copy); } void syncPoint() @@ -193,34 +192,34 @@ public class Session extends Invoker log.debug("%s complete(%d, %d)", this, lower, upper); synchronized (commands) { - for (long id = lower; id <= upper; id++) + for (long id = maxComplete; id <= upper; id++) { commands.remove(id); } + if (lower <= maxComplete + 1) + { + maxComplete = Math.max(maxComplete, upper); + } commands.notifyAll(); log.debug("%s commands remaining: %s", this, commands); } } - void complete(long mark) - { - synchronized (commands) - { - complete(this.mark, mark); - this.mark = mark; - commands.notifyAll(); - } - } - protected void invoke(Method m) { if (m.getEncodedTrack() == Frame.L4) { synchronized (commands) { - // You only need to keep the command if you need command level replay. - // If not we only need to keep track of commands to make sync work - commands.put(commandsOut++,(ENABLE_REPLAY?m:null)); + long next = commandsOut++; + if (next == 0) + { + sessionCommandPoint(0, 0); + } + if (ENABLE_REPLAY) + { + commands.put(next, m); + } channel.method(m); } } @@ -230,7 +229,7 @@ public class Session extends Invoker } } - public void header(Header header) + public void header(Header header) { channel.header(header); } @@ -269,21 +268,32 @@ public class Session extends Invoker public void sync() { + sync(timeout); + } + + public void sync(long timeout) + { log.debug("%s sync()", this); synchronized (commands) { long point = commandsOut - 1; - if (mark < point) + if (maxComplete < point) { - executionSync(); + ExecutionSync sync = new ExecutionSync(); + sync.setSync(true); + invoke(sync); } - while (!closed.get() && mark < point) + long start = System.currentTimeMillis(); + long elapsed = 0; + while (!closed.get() && elapsed < timeout && maxComplete < point) { try { - log.debug("%s waiting for[%d]: %s", this, point, commands); - commands.wait(); + log.debug("%s waiting for[%d]: %d, %s", this, point, + maxComplete, commands); + commands.wait(timeout - elapsed); + elapsed = System.currentTimeMillis() - start; } catch (InterruptedException e) { @@ -291,9 +301,16 @@ public class Session extends Invoker } } - if (mark < point) + if (maxComplete < point) { - throw new RuntimeException("session closed"); + if (closed.get()) + { + throw new RuntimeException("session closed"); + } + else + { + throw new RuntimeException("timed out waiting for sync"); + } } } } @@ -346,16 +363,19 @@ public class Session extends Invoker } } - public T get(long timeout, int nanos) + public T get(long timeout) { synchronized (this) { - while (!closed.get() && !isDone()) + long start = System.currentTimeMillis(); + long elapsed = 0; + while (!closed.get() && timeout - elapsed > 0 && !isDone()) { try { log.debug("%s waiting for result: %s", Session.this, this); - wait(timeout, nanos); + wait(timeout - elapsed); + elapsed = System.currentTimeMillis() - start; } catch (InterruptedException e) { @@ -364,22 +384,23 @@ public class Session extends Invoker } } - if (!isDone()) + if (isDone()) + { + return result; + } + else if (closed.get()) { throw new RuntimeException("session closed"); } - - return result; - } - - public T get(long timeout) - { - return get(timeout, 0); + else + { + return null; + } } public T get() { - return get(0); + return get(timeout); } public boolean isDone() @@ -396,7 +417,8 @@ public class Session extends Invoker public void close() { - sessionClose(); + sessionRequestTimeout(0); + sessionDetach(name); // XXX: channel.close(); } diff --git a/java/common/src/main/java/org/apache/qpidity/transport/SessionDelegate.java b/java/common/src/main/java/org/apache/qpidity/transport/SessionDelegate.java index 5e36a77a98..442aba0e9b 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/SessionDelegate.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/SessionDelegate.java @@ -35,20 +35,16 @@ public abstract class SessionDelegate { public void init(Session ssn, ProtocolHeader hdr) { } - public void method(Session ssn, Method method) { - if (method.getEncodedTrack() == Frame.L4) - { - method.setId(ssn.nextCommandId()); - } - + public void control(Session ssn, Method method) { method.dispatch(ssn, this); + } - if (method.getEncodedTrack() == Frame.L4) + public void command(Session ssn, Method method) { + method.setId(ssn.nextCommandId()); + method.dispatch(ssn, this); + if (!method.hasPayload()) { - if (!method.hasPayload()) - { - ssn.processed(method); - } + ssn.processed(method); } } @@ -60,12 +56,12 @@ public abstract class SessionDelegate @Override public void executionResult(Session ssn, ExecutionResult result) { - ssn.result(result.getCommandId(), result.getData()); + ssn.result(result.getCommandId(), result.getValue()); } - @Override public void executionComplete(Session ssn, ExecutionComplete excmp) + @Override public void sessionCompleted(Session ssn, SessionCompleted cmp) { - RangeSet ranges = excmp.getRangedExecutionSet(); + RangeSet ranges = cmp.getCommands(); if (ranges != null) { for (Range range : ranges) @@ -73,12 +69,27 @@ public abstract class SessionDelegate ssn.complete(range.getLower(), range.getUpper()); } } - ssn.complete(excmp.getCumulativeExecutionMark()); } - @Override public void executionFlush(Session ssn, ExecutionFlush flush) + @Override public void sessionFlush(Session ssn, SessionFlush flush) + { + if (flush.getCompleted()) + { + ssn.flushProcessed(); + } + if (flush.getConfirmed()) + { + throw new Error("not implemented"); + } + if (flush.getExpected()) + { + throw new Error("not implemented"); + } + } + + @Override public void sessionCommandPoint(Session ssn, SessionCommandPoint scp) { - ssn.flushProcessed(); + ssn.commandsIn = scp.getCommandId(); } @Override public void executionSync(Session ssn, ExecutionSync sync) diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Struct.java b/java/common/src/main/java/org/apache/qpidity/transport/Struct.java index 200c3b68e3..f901f9e840 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/Struct.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/Struct.java @@ -71,8 +71,6 @@ public abstract class Struct implements Encodable return type; } - public abstract boolean hasTicket(); - private final boolean isBit(Field<?,?> f) { return f.getType().equals(Boolean.class); @@ -80,14 +78,7 @@ public abstract class Struct implements Encodable private final boolean packed() { - if (this instanceof Method) - { - return false; - } - else - { - return true; - } + return getPackWidth() > 0; } private final boolean encoded(Field<?,?> f) @@ -147,11 +138,6 @@ public abstract class Struct implements Encodable } } - if (hasTicket()) - { - dec.readShort(); - } - for (Field<?,?> f : fields) { if (encoded(f)) @@ -187,11 +173,6 @@ public abstract class Struct implements Encodable } } - if (hasTicket()) - { - enc.writeShort(0x0); - } - for (Field<?,?> f : fields) { if (encoded(f)) diff --git a/java/common/src/main/java/org/apache/qpidity/transport/TransportConstants.java b/java/common/src/main/java/org/apache/qpidity/transport/TransportConstants.java index 54429a1a4f..e9a0705de0 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/TransportConstants.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/TransportConstants.java @@ -2,8 +2,9 @@ package org.apache.qpidity.transport; public class TransportConstants { - private static byte _protocol_version_minor = 0; - private static byte _protocol_version_major = 99; + + private static byte _protocol_version_minor = 10; + private static byte _protocol_version_major = 0; public static void setVersionMajor(byte value) { @@ -24,4 +25,5 @@ public class TransportConstants { return _protocol_version_minor; } + } diff --git a/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java b/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java index ae67483a23..0f6180f54a 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java @@ -20,7 +20,10 @@ */ package org.apache.qpidity.transport.codec; +import java.io.UnsupportedEncodingException; + import java.util.ArrayList; +import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -99,19 +102,19 @@ abstract class AbstractDecoder implements Decoder nbits = 0; } - public short readOctet() + public short readUint8() { return uget(); } - public int readShort() + public int readUint16() { int i = uget() << 8; i |= uget(); return i; } - public long readLong() + public long readUint32() { long l = uget() << 24; l |= uget() << 16; @@ -120,7 +123,12 @@ abstract class AbstractDecoder implements Decoder return l; } - public long readLonglong() + public int readSequenceNo() + { + return (int) readUint32(); + } + + public long readUint64() { long l = 0; for (int i = 0; i < 8; i++) @@ -130,31 +138,67 @@ abstract class AbstractDecoder implements Decoder return l; } - public long readTimestamp() + public long readDatetime() + { + return readUint64(); + } + + private static final String decode(byte[] bytes, String charset) { - return readLonglong(); + try + { + return new String(bytes, charset); + } + catch (UnsupportedEncodingException e) + { + throw new RuntimeException(e); + } } - public String readShortstr() + public String readStr8() + { + short size = readUint8(); + byte[] bytes = new byte[size]; + get(bytes); + return decode(bytes, "UTF-8"); + } + + public String readStr16() { - short size = readOctet(); + int size = readUint16(); byte[] bytes = new byte[size]; get(bytes); - return new String(bytes); + return decode(bytes, "UTF-8"); } - public String readLongstr() + public byte[] readVbin8() { - long size = readLong(); - byte[] bytes = new byte[(int) size]; + int size = readUint8(); + byte[] bytes = new byte[size]; get(bytes); - return new String(bytes); + return bytes; } - public RangeSet readRfc1982LongSet() + public byte[] readVbin16() { - int count = readShort()/8; + int size = readUint16(); + byte[] bytes = new byte[size]; + get(bytes); + return bytes; + } + + public byte[] readVbin32() + { + int size = (int) readUint32(); + byte[] bytes = new byte[size]; + get(bytes); + return bytes; + } + + public RangeSet readSequenceSet() + { + int count = readUint16()/8; if (count == 0) { return null; @@ -164,16 +208,21 @@ abstract class AbstractDecoder implements Decoder RangeSet ranges = new RangeSet(); for (int i = 0; i < count; i++) { - ranges.add(readLong(), readLong()); + ranges.add(readUint32(), readUint32()); } return ranges; } } + public RangeSet readByteRanges() + { + throw new Error("not implemented"); + } + public UUID readUuid() { - long msb = readLonglong(); - long lsb = readLonglong(); + long msb = readUint64(); + long lsb = readUint64(); return new UUID(msb, lsb); } @@ -194,34 +243,39 @@ abstract class AbstractDecoder implements Decoder return null; } } + if (type > 0) + { + int code = readUint16(); + assert code == type; + } st.read(this); return st; } - public Struct readLongStruct() + public Struct readStruct32() { - long size = readLong(); + long size = readUint32(); if (size == 0) { return null; } else { - int type = readShort(); + int type = readUint16(); Struct result = Struct.create(type); result.read(this); return result; } } - public Map<String,Object> readTable() + public Map<String,Object> readMap() { - long size = readLong(); + long size = readUint32(); int start = count; Map<String,Object> result = new LinkedHashMap(); while (count < start + size) { - String key = readShortstr(); + String key = readStr8(); byte code = get(); Type t = getType(code); Object value = read(t); @@ -230,9 +284,9 @@ abstract class AbstractDecoder implements Decoder return result; } - public List<Object> readSequence() + public List<Object> readList() { - long size = readLong(); + long size = readUint32(); int start = count; List<Object> result = new ArrayList(); while (count < start + size) @@ -247,10 +301,15 @@ abstract class AbstractDecoder implements Decoder public List<Object> readArray() { - long size = readLong(); + long size = readUint32(); + if (size == 0) + { + return Collections.EMPTY_LIST; + } + byte code = get(); Type t = getType(code); - long count = readLong(); + long count = readUint32(); List<Object> result = new ArrayList<Object>(); for (int i = 0; i < count; i++) @@ -291,11 +350,11 @@ abstract class AbstractDecoder implements Decoder switch (width) { case 1: - return readOctet(); + return readUint8(); case 2: - return readShort(); + return readUint16(); case 4: - return readLong(); + return readUint32(); default: throw new IllegalStateException("illegal width: " + width); } @@ -313,81 +372,72 @@ abstract class AbstractDecoder implements Decoder { switch (t) { - case OCTET: - case UNSIGNED_BYTE: - return readOctet(); - case SIGNED_BYTE: + case BIN8: + case UINT8: + return readUint8(); + case INT8: return get(); case CHAR: return (char) get(); case BOOLEAN: return get() > 0; - case TWO_OCTETS: - case UNSIGNED_SHORT: - return readShort(); + case BIN16: + case UINT16: + return readUint16(); - case SIGNED_SHORT: - return (short) readShort(); + case INT16: + return (short) readUint16(); - case FOUR_OCTETS: - case UNSIGNED_INT: - return readLong(); + case BIN32: + case UINT32: + return readUint32(); - case UTF32_CHAR: - case SIGNED_INT: - return (int) readLong(); + case CHAR_UTF32: + case INT32: + return (int) readUint32(); case FLOAT: - return Float.intBitsToFloat((int) readLong()); + return Float.intBitsToFloat((int) readUint32()); - case EIGHT_OCTETS: - case SIGNED_LONG: - case UNSIGNED_LONG: + case BIN64: + case UINT64: + case INT64: case DATETIME: - return readLonglong(); + return readUint64(); case DOUBLE: - return Double.longBitsToDouble(readLonglong()); - - case SIXTEEN_OCTETS: - case THIRTY_TWO_OCTETS: - case SIXTY_FOUR_OCTETS: - case _128_OCTETS: - case SHORT_BINARY: - case BINARY: - case LONG_BINARY: - return readBytes(t); + return Double.longBitsToDouble(readUint64()); case UUID: return readUuid(); - case SHORT_STRING: - case SHORT_UTF8_STRING: - case SHORT_UTF16_STRING: - case SHORT_UTF32_STRING: - case STRING: - case UTF8_STRING: - case UTF16_STRING: - case UTF32_STRING: - case LONG_STRING: - case LONG_UTF8_STRING: - case LONG_UTF16_STRING: - case LONG_UTF32_STRING: + case STR8: + return readStr8(); + + case STR16: + return readStr16(); + + case STR8_LATIN: + case STR8_UTF16: + case STR16_LATIN: + case STR16_UTF16: // XXX: need to do character conversion return new String(readBytes(t)); - case TABLE: - return readTable(); - case SEQUENCE: - return readSequence(); + case MAP: + return readMap(); + case LIST: + return readList(); case ARRAY: return readArray(); + case STRUCT32: + return readStruct32(); - case FIVE_OCTETS: - case DECIMAL: - case NINE_OCTETS: - case LONG_DECIMAL: + case BIN40: + case DEC32: + case BIN72: + case DEC64: // XXX: what types are we supposed to use here? return readBytes(t); diff --git a/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java b/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java index c2dd205d66..56b4537719 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java @@ -20,8 +20,11 @@ */ package org.apache.qpidity.transport.codec; +import java.io.UnsupportedEncodingException; + import java.nio.ByteBuffer; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -48,17 +51,17 @@ abstract class AbstractEncoder implements Encoder static { ENCODINGS.put(Boolean.class, Type.BOOLEAN); - ENCODINGS.put(String.class, Type.LONG_STRING); - ENCODINGS.put(Long.class, Type.SIGNED_LONG); - ENCODINGS.put(Integer.class, Type.SIGNED_INT); - ENCODINGS.put(Short.class, Type.SIGNED_SHORT); - ENCODINGS.put(Byte.class, Type.SIGNED_BYTE); - ENCODINGS.put(Map.class, Type.TABLE); - ENCODINGS.put(List.class, Type.SEQUENCE); + ENCODINGS.put(String.class, Type.STR16); + ENCODINGS.put(Long.class, Type.INT64); + ENCODINGS.put(Integer.class, Type.INT32); + ENCODINGS.put(Short.class, Type.INT16); + ENCODINGS.put(Byte.class, Type.INT8); + ENCODINGS.put(Map.class, Type.MAP); + ENCODINGS.put(List.class, Type.LIST); ENCODINGS.put(Float.class, Type.FLOAT); ENCODINGS.put(Double.class, Type.DOUBLE); ENCODINGS.put(Character.class, Type.CHAR); - ENCODINGS.put(byte[].class, Type.LONG_BINARY); + ENCODINGS.put(byte[].class, Type.VBIN32); } protected Sizer sizer() @@ -120,14 +123,14 @@ abstract class AbstractEncoder implements Encoder flushBits(); } - public void writeOctet(short b) + public void writeUint8(short b) { assert b < 0x100; put((byte) b); } - public void writeShort(int s) + public void writeUint16(int s) { assert s < 0x10000; @@ -135,7 +138,7 @@ abstract class AbstractEncoder implements Encoder put(lsb(s)); } - public void writeLong(long i) + public void writeUint32(long i) { assert i < 0x100000000L; @@ -145,7 +148,12 @@ abstract class AbstractEncoder implements Encoder put(lsb(i)); } - public void writeLonglong(long l) + public void writeSequenceNo(int i) + { + writeUint32(i); + } + + public void writeUint64(long l) { for (int i = 0; i < 8; i++) { @@ -154,47 +162,101 @@ abstract class AbstractEncoder implements Encoder } - public void writeTimestamp(long l) + public void writeDatetime(long l) + { + writeUint64(l); + } + + private static final String checkLength(String s, int n) + { + if (s == null) + { + return ""; + } + + if (s.length() > n) + { + throw new IllegalArgumentException("string too long: " + s); + } + else + { + return s; + } + } + + private static final byte[] encode(String s, String charset) + { + try + { + return s.getBytes(charset); + } + catch (UnsupportedEncodingException e) + { + throw new RuntimeException(e); + } + } + + public void writeStr8(String s) { - writeLonglong(l); + s = checkLength(s, 255); + writeUint8((short) s.length()); + put(ByteBuffer.wrap(encode(s, "UTF-8"))); } + public void writeStr16(String s) + { + s = checkLength(s, 65535); + writeUint16(s.length()); + put(ByteBuffer.wrap(encode(s, "UTF-8"))); + } - public void writeShortstr(String s) + public void writeVbin8(byte[] bytes) { - if (s == null) { s = ""; } - if (s.length() > 255) { - throw new IllegalArgumentException(s); + if (bytes == null) { bytes = new byte[0]; } + if (bytes.length > 255) + { + throw new IllegalArgumentException("array too long: " + bytes.length); } - writeOctet((short) s.length()); - put(ByteBuffer.wrap(s.getBytes())); + writeUint8((short) bytes.length); + put(ByteBuffer.wrap(bytes)); } - public void writeLongstr(String s) + public void writeVbin16(byte[] bytes) { - if (s == null) { s = ""; } - writeLong(s.length()); - put(ByteBuffer.wrap(s.getBytes())); + if (bytes == null) { bytes = new byte[0]; } + writeUint16(bytes.length); + put(ByteBuffer.wrap(bytes)); } + public void writeVbin32(byte[] bytes) + { + if (bytes == null) { bytes = new byte[0]; } + writeUint32(bytes.length); + put(ByteBuffer.wrap(bytes)); + } - public void writeRfc1982LongSet(RangeSet ranges) + public void writeSequenceSet(RangeSet ranges) { if (ranges == null) { - writeShort((short) 0); + writeUint16((short) 0); } else { - writeShort(ranges.size() * 8); + writeUint16(ranges.size() * 8); for (Range range : ranges) { - writeLong(range.getLower()); - writeLong(range.getUpper()); + writeUint32(range.getLower()); + writeUint32(range.getUpper()); } } } + public void writeByteRanges(RangeSet ranges) + { + throw new Error("not implemented"); + } + public void writeUuid(UUID uuid) { long msb = 0; @@ -202,15 +264,10 @@ abstract class AbstractEncoder implements Encoder if (uuid != null) { msb = uuid.getMostSignificantBits(); - uuid.getLeastSignificantBits(); + lsb = uuid.getLeastSignificantBits(); } - writeLonglong(msb); - writeLonglong(lsb); - } - - public void writeContent(String c) - { - throw new Error("Deprecated"); + writeUint64(msb); + writeUint64(lsb); } public void writeStruct(int type, Struct s) @@ -237,22 +294,27 @@ abstract class AbstractEncoder implements Encoder } } + if (type > 0) + { + writeUint16(type); + } + s.write(this); } - public void writeLongStruct(Struct s) + public void writeStruct32(Struct s) { if (s == null) { - writeLong(0); + writeUint32(0); } else { Sizer sizer = sizer(); - sizer.writeShort(s.getEncodedType()); + sizer.writeUint16(s.getEncodedType()); s.write(sizer); - writeLong(sizer.size()); - writeShort(s.getEncodedType()); + writeUint32(sizer.size()); + writeUint16(s.getEncodedType()); s.write(this); } } @@ -309,46 +371,46 @@ abstract class AbstractEncoder implements Encoder return null; } - public void writeTable(Map<String,Object> table) + public void writeMap(Map<String,Object> map) { - if (table == null) + if (map == null) { - writeLong(0); + writeUint32(0); return; } Sizer sizer = sizer(); - sizer.writeTable(table); + sizer.writeMap(map); // XXX: - 4 - writeLong(sizer.size() - 4); - writeTableEntries(table); + writeUint32(sizer.size() - 4); + writeMapEntries(map); } - protected void writeTableEntries(Map<String,Object> table) + protected void writeMapEntries(Map<String,Object> map) { - for (Map.Entry<String,Object> entry : table.entrySet()) + for (Map.Entry<String,Object> entry : map.entrySet()) { String key = entry.getKey(); Object value = entry.getValue(); Type type = encoding(value); - writeShortstr(key); + writeStr8(key); put(type.code); write(type, value); } } - public void writeSequence(List<Object> sequence) + public void writeList(List<Object> list) { Sizer sizer = sizer(); - sizer.writeSequence(sequence); + sizer.writeList(list); // XXX: - 4 - writeLong(sizer.size() - 4); - writeSequenceEntries(sequence); + writeUint32(sizer.size() - 4); + writeListEntries(list); } - protected void writeSequenceEntries(List<Object> sequence) + protected void writeListEntries(List<Object> list) { - for (Object value : sequence) + for (Object value : list) { Type type = encoding(value); put(type.code); @@ -358,10 +420,15 @@ abstract class AbstractEncoder implements Encoder public void writeArray(List<Object> array) { + if (array == null) + { + array = Collections.EMPTY_LIST; + } + Sizer sizer = sizer(); sizer.writeArray(array); // XXX: -4 - writeLong(sizer.size() - 4); + writeUint32(sizer.size() - 4); writeArrayEntries(array); } @@ -371,7 +438,7 @@ abstract class AbstractEncoder implements Encoder if (array.isEmpty()) { - type = Type.VOID; + return; } else { @@ -380,6 +447,8 @@ abstract class AbstractEncoder implements Encoder put(type.code); + writeUint32(array.size()); + for (Object value : array) { write(type, value); @@ -409,13 +478,13 @@ abstract class AbstractEncoder implements Encoder switch (width) { case 1: - writeOctet((short) size); + writeUint8((short) size); break; case 2: - writeShort(size); + writeUint16(size); break; case 4: - writeLong(size); + writeUint32(size); break; default: throw new IllegalStateException("illegal width: " + width); @@ -444,11 +513,11 @@ abstract class AbstractEncoder implements Encoder { switch (t) { - case OCTET: - case UNSIGNED_BYTE: - writeOctet(coerce(Short.class, value)); + case BIN8: + case UINT8: + writeUint8(coerce(Short.class, value)); break; - case SIGNED_BYTE: + case INT8: put(coerce(Byte.class, value)); break; case CHAR: @@ -465,85 +534,78 @@ abstract class AbstractEncoder implements Encoder } break; - case TWO_OCTETS: - case UNSIGNED_SHORT: - writeShort(coerce(Integer.class, value)); + case BIN16: + case UINT16: + writeUint16(coerce(Integer.class, value)); break; - case SIGNED_SHORT: - writeShort(coerce(Short.class, value)); + case INT16: + writeUint16(coerce(Short.class, value)); break; - case FOUR_OCTETS: - case UNSIGNED_INT: - writeLong(coerce(Long.class, value)); + case BIN32: + case UINT32: + writeUint32(coerce(Long.class, value)); break; - case UTF32_CHAR: - case SIGNED_INT: - writeLong(coerce(Integer.class, value)); + case CHAR_UTF32: + case INT32: + writeUint32(coerce(Integer.class, value)); break; case FLOAT: - writeLong(Float.floatToIntBits(coerce(Float.class, value))); + writeUint32(Float.floatToIntBits(coerce(Float.class, value))); break; - case EIGHT_OCTETS: - case SIGNED_LONG: - case UNSIGNED_LONG: + case BIN64: + case UINT64: + case INT64: case DATETIME: - writeLonglong(coerce(Long.class, value)); + writeUint64(coerce(Long.class, value)); break; case DOUBLE: long bits = Double.doubleToLongBits(coerce(Double.class, value)); - writeLonglong(bits); - break; - - case SIXTEEN_OCTETS: - case THIRTY_TWO_OCTETS: - case SIXTY_FOUR_OCTETS: - case _128_OCTETS: - case SHORT_BINARY: - case BINARY: - case LONG_BINARY: - writeBytes(t, coerce(byte[].class, value)); + writeUint64(bits); break; case UUID: writeUuid(coerce(UUID.class, value)); break; - case SHORT_STRING: - case SHORT_UTF8_STRING: - case SHORT_UTF16_STRING: - case SHORT_UTF32_STRING: - case STRING: - case UTF8_STRING: - case UTF16_STRING: - case UTF32_STRING: - case LONG_STRING: - case LONG_UTF8_STRING: - case LONG_UTF16_STRING: - case LONG_UTF32_STRING: + case STR8: + writeStr8(coerce(String.class, value)); + break; + + case STR16: + writeStr16(coerce(String.class, value)); + break; + + case STR8_LATIN: + case STR8_UTF16: + case STR16_LATIN: + case STR16_UTF16: // XXX: need to do character conversion writeBytes(t, coerce(String.class, value).getBytes()); break; - case TABLE: - writeTable((Map<String,Object>) coerce(Map.class, value)); + case MAP: + writeMap((Map<String,Object>) coerce(Map.class, value)); break; - case SEQUENCE: - writeSequence(coerce(List.class, value)); + case LIST: + writeList(coerce(List.class, value)); break; case ARRAY: writeArray(coerce(List.class, value)); break; + case STRUCT32: + writeStruct32(coerce(Struct.class, value)); + break; - case FIVE_OCTETS: - case DECIMAL: - case NINE_OCTETS: - case LONG_DECIMAL: + case BIN40: + case DEC32: + case BIN72: + case DEC64: // XXX: what types are we supposed to use here? writeBytes(t, coerce(byte[].class, value)); break; diff --git a/java/common/src/main/java/org/apache/qpidity/transport/codec/Decoder.java b/java/common/src/main/java/org/apache/qpidity/transport/codec/Decoder.java index f0738e0a91..df2d208118 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/codec/Decoder.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/codec/Decoder.java @@ -38,26 +38,30 @@ public interface Decoder { boolean readBit(); - short readOctet(); - int readShort(); - long readLong(); - long readLonglong(); + short readUint8(); + int readUint16(); + long readUint32(); + long readUint64(); - long readTimestamp(); - - String readShortstr(); - String readLongstr(); - - RangeSet readRfc1982LongSet(); + long readDatetime(); UUID readUuid(); - String readContent(); + int readSequenceNo(); + RangeSet readSequenceSet(); // XXX + RangeSet readByteRanges(); // XXX - Struct readStruct(int type); - Struct readLongStruct(); + String readStr8(); + String readStr16(); - Map<String,Object> readTable(); - List<Object> readSequence(); + byte[] readVbin8(); + byte[] readVbin16(); + byte[] readVbin32(); + + Struct readStruct32(); + Map<String,Object> readMap(); + List<Object> readList(); List<Object> readArray(); + Struct readStruct(int type); + } diff --git a/java/common/src/main/java/org/apache/qpidity/transport/codec/Encoder.java b/java/common/src/main/java/org/apache/qpidity/transport/codec/Encoder.java index 1b2fe0213e..0449ba6702 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/codec/Encoder.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/codec/Encoder.java @@ -40,26 +40,30 @@ public interface Encoder void flush(); void writeBit(boolean b); - void writeOctet(short b); - void writeShort(int s); - void writeLong(long i); - void writeLonglong(long l); + void writeUint8(short b); + void writeUint16(int s); + void writeUint32(long i); + void writeUint64(long l); - void writeTimestamp(long l); - - void writeShortstr(String s); - void writeLongstr(String s); - - void writeRfc1982LongSet(RangeSet ranges); + void writeDatetime(long l); void writeUuid(UUID uuid); - void writeContent(String c); + void writeSequenceNo(int s); + void writeSequenceSet(RangeSet ranges); // XXX + void writeByteRanges(RangeSet ranges); // XXX - void writeStruct(int type, Struct s); - void writeLongStruct(Struct s); + void writeStr8(String s); + void writeStr16(String s); - void writeTable(Map<String,Object> table); - void writeSequence(List<Object> sequence); + void writeVbin8(byte[] bytes); + void writeVbin16(byte[] bytes); + void writeVbin32(byte[] bytes); + + void writeStruct32(Struct s); + void writeMap(Map<String,Object> map); + void writeList(List<Object> list); void writeArray(List<Object> array); + void writeStruct(int type, Struct s); + } diff --git a/java/common/src/main/java/org/apache/qpidity/transport/codec/Sizer.java b/java/common/src/main/java/org/apache/qpidity/transport/codec/Sizer.java index b98bf98239..8ffdd5150b 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/codec/Sizer.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/codec/Sizer.java @@ -38,30 +38,34 @@ public interface Sizer extends Encoder public static final Sizer NULL = new Sizer() { - public void flush() {}; + public void flush() {} - public void writeBit(boolean b) {}; - public void writeOctet(short b) {}; - public void writeShort(int s) {}; - public void writeLong(long i) {}; - public void writeLonglong(long l) {}; + public void writeBit(boolean b) {} + public void writeUint8(short b) {} + public void writeUint16(int s) {} + public void writeUint32(long i) {} + public void writeUint64(long l) {} - public void writeTimestamp(long l) {}; + public void writeDatetime(long l) {} + public void writeUuid(UUID uuid) {} - public void writeShortstr(String s) {}; - public void writeLongstr(String s) {}; + public void writeSequenceNo(int s) {} + public void writeSequenceSet(RangeSet ranges) {} // XXX + public void writeByteRanges(RangeSet ranges) {} // XXX - public void writeRfc1982LongSet(RangeSet ranges) {}; - public void writeUuid(UUID uuid) {}; + public void writeStr8(String s) {} + public void writeStr16(String s) {} - public void writeContent(String c) {}; + public void writeVbin8(byte[] bytes) {} + public void writeVbin16(byte[] bytes) {} + public void writeVbin32(byte[] bytes) {} - public void writeStruct(int type, Struct s) {}; - public void writeLongStruct(Struct s) {}; + public void writeStruct32(Struct s) {} + public void writeMap(Map<String,Object> map) {} + public void writeList(List<Object> list) {} + public void writeArray(List<Object> array) {} - public void writeTable(Map<String,Object> table) {}; - public void writeSequence(List<Object> sequence) {}; - public void writeArray(List<Object> array) {}; + public void writeStruct(int type, Struct s) {} public int getSize() { return 0; } diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java b/java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java index 5c35596fd8..d493245f0c 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java @@ -37,6 +37,7 @@ import org.apache.qpidity.transport.ProtocolError; import org.apache.qpidity.transport.ProtocolEvent; import org.apache.qpidity.transport.ProtocolHeader; import org.apache.qpidity.transport.Receiver; +import org.apache.qpidity.transport.SegmentType; import org.apache.qpidity.transport.Struct; @@ -118,7 +119,7 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate { switch (frame.getType()) { - case Frame.BODY: + case BODY: emit(frame, new Data(frame, frame.isFirstFrame(), frame.isLastFrame())); break; @@ -158,22 +159,29 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate } } - private ProtocolEvent decode(Frame frame, byte type, List<ByteBuffer> segment) + private ProtocolEvent decode(Frame frame, SegmentType type, List<ByteBuffer> segment) { FragmentDecoder dec = new FragmentDecoder(segment.iterator()); switch (type) { - case Frame.METHOD: - int methodType = dec.readShort(); - Method method = Method.create(methodType); - method.read(dec); - return method; - case Frame.HEADER: + case CONTROL: + int controlType = dec.readUint16(); + Method control = Method.create(controlType); + control.read(dec); + return control; + case COMMAND: + int commandType = dec.readUint16(); + // read in the session header, right now we don't use it + dec.readUint16(); + Method command = Method.create(commandType); + command.read(dec); + return command; + case HEADER: List<Struct> structs = new ArrayList(); while (dec.hasRemaining()) { - structs.add(dec.readLongStruct()); + structs.add(dec.readStruct32()); } return new Header(structs,frame.isLastFrame() && frame.isLastSegment()); default: diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/Disassembler.java b/java/common/src/main/java/org/apache/qpidity/transport/network/Disassembler.java index d38d8ded98..0357df6e86 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/network/Disassembler.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/network/Disassembler.java @@ -31,6 +31,7 @@ import org.apache.qpidity.transport.ProtocolDelegate; import org.apache.qpidity.transport.ProtocolError; import org.apache.qpidity.transport.ProtocolEvent; import org.apache.qpidity.transport.ProtocolHeader; +import org.apache.qpidity.transport.SegmentType; import org.apache.qpidity.transport.Sender; import org.apache.qpidity.transport.Struct; @@ -76,50 +77,50 @@ public class Disassembler implements Sender<ConnectionEvent>, sender.close(); } - private void fragment(byte flags, byte type, ConnectionEvent event, + private void fragment(byte flags, SegmentType type, ConnectionEvent event, ByteBuffer buf, boolean first, boolean last) { if(!buf.hasRemaining()) { //empty data - byte nflags = flags; + byte nflags = flags; if (first) { nflags |= FIRST_FRAME; first = false; } - nflags |= LAST_FRAME; - Frame frame = new Frame(nflags, type, + nflags |= LAST_FRAME; + Frame frame = new Frame(nflags, type, event.getProtocolEvent().getEncodedTrack(), event.getChannel()); - // frame.addFragment(buf); + // frame.addFragment(buf); sender.send(frame); } else { - while (buf.hasRemaining()) - { - ByteBuffer slice = buf.slice(); - slice.limit(min(maxPayload, slice.remaining())); - buf.position(buf.position() + slice.remaining()); - - byte newflags = flags; - if (first) + while (buf.hasRemaining()) { - newflags |= FIRST_FRAME; - first = false; + ByteBuffer slice = buf.slice(); + slice.limit(min(maxPayload, slice.remaining())); + buf.position(buf.position() + slice.remaining()); + + byte newflags = flags; + if (first) + { + newflags |= FIRST_FRAME; + first = false; + } + if (last && !buf.hasRemaining()) + { + newflags |= LAST_FRAME; + } + + Frame frame = new Frame(newflags, type, + event.getProtocolEvent().getEncodedTrack(), + event.getChannel()); + frame.addFragment(slice); + sender.send(frame); } - if (last && !buf.hasRemaining()) - { - newflags |= LAST_FRAME; - } - - Frame frame = new Frame(newflags, type, - event.getProtocolEvent().getEncodedTrack(), - event.getChannel()); - frame.addFragment(slice); - sender.send(frame); - } } } @@ -128,15 +129,40 @@ public class Disassembler implements Sender<ConnectionEvent>, sender.send(header); } - public void method(ConnectionEvent event, Method method) + public void control(ConnectionEvent event, Method method) + { + method(event, method, SegmentType.CONTROL); + } + + public void command(ConnectionEvent event, Method method) + { + method(event, method, SegmentType.COMMAND); + } + + private void method(ConnectionEvent event, Method method, SegmentType type) { SizeEncoder sizer = new SizeEncoder(); - sizer.writeShort(method.getEncodedType()); + sizer.writeUint16(method.getEncodedType()); + if (type == SegmentType.COMMAND) + { + sizer.writeUint16(0); + } method.write(sizer); ByteBuffer buf = ByteBuffer.allocate(sizer.size()); BBEncoder enc = new BBEncoder(buf); - enc.writeShort(method.getEncodedType()); + enc.writeUint16(method.getEncodedType()); + if (type == SegmentType.COMMAND) + { + if (method.isSync()) + { + enc.writeUint16(0x0101); + } + else + { + enc.writeUint16(0x0100); + } + } method.write(enc); enc.flush(); buf.flip(); @@ -148,7 +174,7 @@ public class Disassembler implements Sender<ConnectionEvent>, flags |= LAST_SEG; } - fragment(flags, METHOD, event, buf, true, true); + fragment(flags, type, event, buf, true, true); } public void header(ConnectionEvent event, Header header) @@ -159,24 +185,24 @@ public class Disassembler implements Sender<ConnectionEvent>, SizeEncoder sizer = new SizeEncoder(); for (Struct st : header.getStructs()) { - sizer.writeLongStruct(st); + sizer.writeStruct32(st); } buf = ByteBuffer.allocate(sizer.size()); BBEncoder enc = new BBEncoder(buf); for (Struct st : header.getStructs()) { - enc.writeLongStruct(st); + enc.writeStruct32(st); enc.flush(); } header.setBuf(buf); } else { - buf = header.getBuf(); + buf = header.getBuf(); } buf.flip(); - fragment((byte) 0x0, HEADER, event, buf, true, true); + fragment((byte) 0x0, SegmentType.HEADER, event, buf, true, true); } public void data(ConnectionEvent event, Data data) @@ -187,7 +213,7 @@ public class Disassembler implements Sender<ConnectionEvent>, { ByteBuffer buf = it.next(); boolean last = data.isLast() && !it.hasNext(); - fragment(LAST_SEG, BODY, event, buf, first, last); + fragment(LAST_SEG, SegmentType.BODY, event, buf, first, last); first = false; } } diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java b/java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java index a5c5db4dba..c5fb7b9986 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java @@ -20,6 +20,7 @@ */ package org.apache.qpidity.transport.network; +import org.apache.qpidity.transport.SegmentType; import org.apache.qpidity.transport.util.SliceIterator; import java.nio.ByteBuffer; @@ -48,10 +49,6 @@ public class Frame implements NetworkEvent, Iterable<ByteBuffer> public static final byte L3 = 2; public static final byte L4 = 3; - public static final byte METHOD = 1; - public static final byte HEADER = 2; - public static final byte BODY = 3; - public static final byte RESERVED = 0x0; public static final byte VERSION = 0x0; @@ -62,13 +59,13 @@ public class Frame implements NetworkEvent, Iterable<ByteBuffer> public static final byte LAST_FRAME = 0x1; final private byte flags; - final private byte type; + final private SegmentType type; final private byte track; final private int channel; final private List<ByteBuffer> fragments; private int size; - public Frame(byte flags, byte type, byte track, int channel) + public Frame(byte flags, SegmentType type, byte track, int channel) { this.flags = flags; this.type = type; @@ -99,7 +96,7 @@ public class Frame implements NetworkEvent, Iterable<ByteBuffer> return size; } - public byte getType() + public SegmentType getType() { return type; } @@ -153,7 +150,7 @@ public class Frame implements NetworkEvent, Iterable<ByteBuffer> { StringBuilder str = new StringBuilder(); str.append(String.format - ("[%05d %05d %1d %1d %d%d%d%d]", getChannel(), getSize(), + ("[%05d %05d %1d %1d %d%d%d%d] ", getChannel(), getSize(), getTrack(), getType(), isFirstSegment() ? 1 : 0, isLastSegment() ? 1 : 0, isFirstFrame() ? 1 : 0, isLastFrame() ? 1 : 0)); @@ -170,7 +167,7 @@ public class Frame implements NetworkEvent, Iterable<ByteBuffer> str.append(" | "); } - str.append(str(buf, 20)); + str.append(str(buf)); } return str.toString(); diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java b/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java index 871c45743e..2d41a9f516 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java @@ -22,10 +22,10 @@ package org.apache.qpidity.transport.network; import java.nio.ByteBuffer; -import org.apache.qpidity.transport.Constant; import org.apache.qpidity.transport.ProtocolError; import org.apache.qpidity.transport.ProtocolHeader; import org.apache.qpidity.transport.Receiver; +import org.apache.qpidity.transport.SegmentType; import static org.apache.qpidity.transport.util.Functions.*; @@ -77,7 +77,7 @@ public class InputHandler implements Receiver<ByteBuffer> private byte minor; private byte flags; - private byte type; + private SegmentType type; private byte track; private int channel; private int size; @@ -146,7 +146,7 @@ public class InputHandler implements Receiver<ByteBuffer> flags = buf.get(); return FRAME_HDR_TYPE; case FRAME_HDR_TYPE: - type = buf.get(); + type = SegmentType.get(buf.get()); return FRAME_HDR_SIZE1; case FRAME_HDR_SIZE1: size = (0xFF & buf.get()) << 8; @@ -218,7 +218,7 @@ public class InputHandler implements Receiver<ByteBuffer> return FRAME_END; } case FRAME_END: - return expect(buf, Constant.FRAME_END, FRAME_HDR); + return expect(buf, OutputHandler.FRAME_END, FRAME_HDR); default: throw new IllegalStateException(); } diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java b/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java index 9f770bcb1c..8f615cf80d 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java @@ -67,11 +67,13 @@ public class OutputHandler implements Sender<NetworkEvent>, NetworkDelegate } } + public static final int FRAME_END = 0xCE; + public void frame(Frame frame) { ByteBuffer hdr = ByteBuffer.allocate(HEADER_SIZE + frame.getSize() + 1); hdr.put(frame.getFlags()); - hdr.put(frame.getType()); + hdr.put((byte) frame.getType().getValue()); hdr.putShort((short) (frame.getSize() + HEADER_SIZE)); hdr.put(RESERVED); hdr.put(frame.getTrack()); @@ -84,7 +86,7 @@ public class OutputHandler implements Sender<NetworkEvent>, NetworkDelegate { hdr.put(buf); } - hdr.put((byte) Constant.FRAME_END); + hdr.put((byte) FRAME_END); hdr.flip(); synchronized (lock) { diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaHandler.java b/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaHandler.java index 2e4875cf42..7da719087c 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaHandler.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaHandler.java @@ -200,7 +200,7 @@ public class MinaHandler<E> implements IoHandler ConnectionDelegate delegate) { return connect(host, port, new ConnectionBinding - (delegate, InputHandler.State.FRAME_HDR)); + (delegate, InputHandler.State.PROTO_HDR)); } private static class ConnectionBinding diff --git a/java/common/src/main/java/org/apache/qpidity/transport/util/Functions.java b/java/common/src/main/java/org/apache/qpidity/transport/util/Functions.java index 7dbb3d30e8..14e756c047 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/util/Functions.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/util/Functions.java @@ -52,15 +52,24 @@ public class Functions public static final String str(ByteBuffer buf, int limit) { StringBuilder str = new StringBuilder(); + str.append('"'); + for (int i = 0; i < min(buf.remaining(), limit); i++) { - if (i > 0 && i % 2 == 0) + byte c = buf.get(buf.position() + i); + + if (c > 31 && c < 127 && c != '\\') { - str.append(" "); + str.append((char)c); + } + else + { + str.append(String.format("\\x%02x", c)); } - str.append(String.format("%02x", buf.get(buf.position() + i))); } + str.append('"'); + return str.toString(); } diff --git a/java/common/src/test/java/org/apache/qpidity/transport/ConnectionTest.java b/java/common/src/test/java/org/apache/qpidity/transport/ConnectionTest.java index d7ece9b745..1a83786e12 100644 --- a/java/common/src/test/java/org/apache/qpidity/transport/ConnectionTest.java +++ b/java/common/src/test/java/org/apache/qpidity/transport/ConnectionTest.java @@ -100,12 +100,12 @@ public class ConnectionTest extends TestCase } Channel ch = conn.getChannel(0); - Session ssn = new Session(); + Session ssn = new Session("test".getBytes()); ssn.attach(ch); try { - ssn.sessionOpen(1234); + ssn.sessionAttach(ssn.getName()); fail("writing to a closed socket succeeded"); } catch (TransportException e) diff --git a/java/common/templating.py b/java/common/templating.py new file mode 100644 index 0000000000..832b7ecb9c --- /dev/null +++ b/java/common/templating.py @@ -0,0 +1,101 @@ + +class Parser: + + def __init__(self, **kwargs): + self.output = "" + self.environ = {"out": self.parse} + for k, v in kwargs.items(): + self.environ[k] = v + self.text = "" + self.level = 0 + self.line = None + + def action(self, actor): + text = self.text + self.text = "" + actor(text) + + def out(self, text): + self.output += text + + def prefix_lines(self, text): + return "%s%s" % ("\n"*(self.line - 1 - text.count("\n")), text) + + def evaluate(self, text): + self.out(str(eval(self.prefix_lines(text), self.environ, self.environ))) + + def execute(self, text): + exec self.prefix_lines(text) in self.environ, self.environ + + def parse(self, input): + old_line = self.line + try: + state = self.start + self.line = 1 + for ch in input: + state = state(ch) + if ch == "\n": + self.line += 1 + if state == self.start: + self.action(self.out) + elif state == self.alnum: + self.action(self.evaluate) + else: + raise ParseError() + finally: + self.line = old_line + + def start(self, ch): + if ch == "$": + return self.dollar + else: + self.text += ch + return self.start + + def dollar(self, ch): + if ch == "$": + self.text += "$" + return self.start + elif ch == "(": + self.action(self.out) + return self.expression + elif ch == "{": + self.action(self.out) + return self.block + else: + self.action(self.out) + self.text += ch + return self.alnum + + def alnum(self, ch): + if ch.isalnum(): + self.text += ch + return self.alnum + else: + self.action(self.evaluate) + self.text += ch + return self.start + + def match(self, ch, start, end): + if ch == start: + self.level += 1 + if ch == end: + self.level -= 1 + + def block(self, ch): + if not self.level and ch == "}": + self.action(self.execute) + return self.start + else: + self.match(ch, "{", "}") + self.text += ch + return self.block + + def expression(self, ch): + if not self.level and ch == ")": + self.action(self.evaluate) + return self.start + else: + self.match(ch, "(", ")") + self.text += ch + return self.expression |