summaryrefslogtreecommitdiff
path: root/java/common
diff options
context:
space:
mode:
Diffstat (limited to 'java/common')
-rw-r--r--java/common/Composite.tpl173
-rw-r--r--java/common/Constant.tpl14
-rw-r--r--java/common/Enum.tpl36
-rw-r--r--java/common/Invoker.tpl41
-rw-r--r--java/common/MethodDelegate.tpl12
-rw-r--r--java/common/Option.tpl19
-rw-r--r--java/common/StructFactory.tpl39
-rw-r--r--java/common/Type.tpl63
-rw-r--r--java/common/build.xml7
-rwxr-xr-xjava/common/codegen63
-rwxr-xr-xjava/common/generate567
-rw-r--r--java/common/genutil.py207
-rw-r--r--java/common/src/main/java/org/apache/qpidity/SecurityHelper.java8
-rw-r--r--java/common/src/main/java/org/apache/qpidity/ToyBroker.java21
-rw-r--r--java/common/src/main/java/org/apache/qpidity/ToyClient.java12
-rw-r--r--java/common/src/main/java/org/apache/qpidity/dtx/XidImpl.java26
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/Channel.java10
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/ChannelDelegate.java9
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/ClientDelegate.java41
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java52
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/Data.java2
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/Method.java23
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/ProtocolDelegate.java4
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/RangeSet.java7
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/Session.java130
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/SessionDelegate.java45
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/Struct.java21
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/TransportConstants.java6
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java212
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java288
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/codec/Decoder.java34
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/codec/Encoder.java34
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/codec/Sizer.java38
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java26
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/network/Disassembler.java96
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java15
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java8
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java6
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaHandler.java2
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/util/Functions.java15
-rw-r--r--java/common/src/test/java/org/apache/qpidity/transport/ConnectionTest.java4
-rw-r--r--java/common/templating.py101
42 files changed, 1493 insertions, 1044 deletions
diff --git a/java/common/Composite.tpl b/java/common/Composite.tpl
new file mode 100644
index 0000000000..4172ffdefc
--- /dev/null
+++ b/java/common/Composite.tpl
@@ -0,0 +1,173 @@
+package org.apache.qpidity.transport;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.qpidity.transport.codec.Decoder;
+import org.apache.qpidity.transport.codec.Encodable;
+import org.apache.qpidity.transport.codec.Encoder;
+
+import org.apache.qpidity.transport.network.Frame;
+
+${
+from genutil import *
+
+cls = klass(type)["@name"]
+
+if type.name in ("control", "command"):
+ base = "Method"
+ size = 0
+ pack = 2
+ if type["segments"]:
+ payload = "true"
+ else:
+ payload = "false"
+ if type.name == "control" and cls == "connection":
+ track = "Frame.L1"
+ elif cls == "session" and type["@name"] in ("attach", "attached", "detach", "detached"):
+ track = "Frame.L2"
+ elif type.name == "command":
+ track = "Frame.L4"
+ else:
+ track = "Frame.L3"
+else:
+ base = "Struct"
+ size = type["@size"]
+ pack = type["@pack"]
+ payload = "false"
+ track = "-1"
+
+typecode = code(type)
+}
+
+public class $name extends $base {
+
+ public static final int TYPE = $typecode;
+
+ public final int getStructType() {
+ return TYPE;
+ }
+
+ public final int getSizeWidth() {
+ return $size;
+ }
+
+ public final int getPackWidth() {
+ return $pack;
+ }
+
+ public final boolean hasPayload() {
+ return $payload;
+ }
+
+ public final byte getEncodedTrack() {
+ return $track;
+ }
+
+ private static final List<Field<?,?>> FIELDS = new ArrayList<Field<?,?>>();
+ public List<Field<?,?>> getFields() { return FIELDS; }
+
+${
+fields = get_fields(type)
+params = get_parameters(fields)
+options = get_options(fields)
+
+for f in fields:
+ out(" private boolean has_$(f.name);\n")
+ out(" private $(f.type) $(f.name);\n")
+}
+
+${
+if fields:
+ out(" public $name() {}\n")
+}
+
+ public $name($(", ".join(params))) {
+${
+for f in fields:
+ if f.option: continue
+ out(" $(f.set)($(f.name));\n")
+
+for f in options:
+ out(" boolean _$(f.name) = false;\n")
+
+if options:
+ out("""
+ for (int i=0; i < _options.length; i++) {
+ switch (_options[i]) {
+""")
+
+ for f in options:
+ out(" case $(f.option): _$(f.name) = true; break;\n")
+
+ out(""" case NO_OPTION: break;
+ default: throw new IllegalArgumentException("invalid option: " + _options[i]);
+ }
+ }
+""")
+
+for f in options:
+ out(" $(f.set)(_$(f.name));\n")
+}
+ }
+
+ public <C> void dispatch(C context, MethodDelegate<C> delegate) {
+ delegate.$(dromedary(name))(context, this);
+ }
+
+${
+for f in fields:
+ out("""
+ public final boolean $(f.has)() {
+ return has_$(f.name);
+ }
+
+ public final $name $(f.clear)() {
+ this.has_$(f.name) = false;
+ this.$(f.name) = $(f.default);
+ this.dirty = true;
+ return this;
+ }
+
+ public final $(f.type) $(f.get)() {
+ return $(f.name);
+ }
+
+ public final $name $(f.set)($(f.type) value) {
+ this.$(f.name) = value;
+ this.has_$(f.name) = true;
+ this.dirty = true;
+ return this;
+ }
+
+ public final $name $(f.name)($(f.type) value) {
+ this.$(f.name) = value;
+ this.has_$(f.name) = true;
+ this.dirty = true;
+ return this;
+ }
+
+ static {
+ FIELDS.add(new Field<$name,$(jref(jclass(f.type)))>($name.class, $(jref(jclass(f.type))).class, "$(f.name)", $(f.index)) {
+ public boolean has(Object struct) {
+ return check(struct).has_$(f.name);
+ }
+ public void has(Object struct, boolean value) {
+ check(struct).has_$(f.name) = value;
+ }
+ public $(jref(f.type)) get(Object struct) {
+ return check(struct).$(f.get)();
+ }
+ public void read(Decoder dec, Object struct) {
+ check(struct).$(f.name) = $(f.read);
+ check(struct).dirty = true;
+ }
+ public void write(Encoder enc, Object struct) {
+ $(f.write);
+ }
+ });
+ }
+""")
+}}
diff --git a/java/common/Constant.tpl b/java/common/Constant.tpl
new file mode 100644
index 0000000000..695812ea75
--- /dev/null
+++ b/java/common/Constant.tpl
@@ -0,0 +1,14 @@
+package org.apache.qpidity.transport;
+
+${from genutil import *}
+
+public interface Constant
+{
+${
+constants = spec.query["amqp/constant"]
+
+for c in constants:
+ name = scream(c["@name"])
+ value = c["@value"]
+ out(" public static final int $name = $value;\n")
+}}
diff --git a/java/common/Enum.tpl b/java/common/Enum.tpl
new file mode 100644
index 0000000000..337feb7065
--- /dev/null
+++ b/java/common/Enum.tpl
@@ -0,0 +1,36 @@
+package org.apache.qpidity.transport;
+
+public enum $name {
+${
+from genutil import *
+
+vtype = jtype(resolve_type(type))
+
+choices = [(scream(ch["@name"]), "(%s) %s" % (vtype, ch["@value"]))
+ for ch in type.query["enum/choice"]]
+}
+ $(",\n ".join(["%s(%s)" % ch for ch in choices]));
+
+ private final $vtype value;
+
+ $name($vtype value)
+ {
+ this.value = value;
+ }
+
+ public $vtype getValue()
+ {
+ return value;
+ }
+
+ public static $name get($vtype value)
+ {
+ switch (value)
+ {
+${
+for ch, value in choices:
+ out(' case $value: return $ch;\n')
+} default: throw new IllegalArgumentException("no such value: " + value);
+ }
+ }
+}
diff --git a/java/common/Invoker.tpl b/java/common/Invoker.tpl
new file mode 100644
index 0000000000..d9905c71a0
--- /dev/null
+++ b/java/common/Invoker.tpl
@@ -0,0 +1,41 @@
+package org.apache.qpidity.transport;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+public abstract class Invoker {
+
+ protected abstract void invoke(Method method);
+ protected abstract <T> Future<T> invoke(Method method, Class<T> resultClass);
+
+${
+from genutil import *
+
+for c in composites:
+ name = cname(c)
+ fields = get_fields(c)
+ params = get_parameters(fields)
+ args = get_arguments(fields)
+ result = c["result"]
+ if result:
+ if not result["@type"]:
+ rname = cname(result["struct"])
+ else:
+ rname = cname(result, "@type")
+ jresult = "Future<%s>" % rname
+ jreturn = "return "
+ jclass = ", %s.class" % rname
+ else:
+ jresult = "void"
+ jreturn = ""
+ jclass = ""
+
+ out("""
+ public $jresult $(dromedary(name))($(", ".join(params))) {
+ $(jreturn)invoke(new $name($(", ".join(args)))$jclass);
+ }
+""")
+}
+
+}
diff --git a/java/common/MethodDelegate.tpl b/java/common/MethodDelegate.tpl
new file mode 100644
index 0000000000..e5ab1ae1e7
--- /dev/null
+++ b/java/common/MethodDelegate.tpl
@@ -0,0 +1,12 @@
+package org.apache.qpidity.transport;
+
+public abstract class MethodDelegate<C> {
+
+${
+from genutil import *
+
+for c in composites:
+ name = cname(c)
+ out(" public void $(dromedary(name))(C context, $name struct) {}\n")
+}
+}
diff --git a/java/common/Option.tpl b/java/common/Option.tpl
new file mode 100644
index 0000000000..5fa2b95b9f
--- /dev/null
+++ b/java/common/Option.tpl
@@ -0,0 +1,19 @@
+package org.apache.qpidity.transport;
+
+public enum Option {
+
+${
+from genutil import *
+
+options = {}
+
+for c in composites:
+ for f in c.query["field"]:
+ t = resolve_type(f)
+ if t["@name"] == "bit":
+ option = scream(f["@name"])
+ if not options.has_key(option):
+ options[option] = None
+ out(" $option,\n")}
+ NO_OPTION
+}
diff --git a/java/common/StructFactory.tpl b/java/common/StructFactory.tpl
new file mode 100644
index 0000000000..b27621b1d2
--- /dev/null
+++ b/java/common/StructFactory.tpl
@@ -0,0 +1,39 @@
+package org.apache.qpidity.transport;
+
+class StructFactory {
+
+ public static Struct create(int type)
+ {
+ switch (type)
+ {
+${
+from genutil import *
+
+fragment = """ case $name.TYPE:
+ return new $name();
+"""
+
+for c in composites:
+ name = cname(c)
+ if c.name == "struct":
+ out(fragment)
+} default:
+ throw new IllegalArgumentException("type: " + type);
+ }
+ }
+
+ public static Struct createInstruction(int type)
+ {
+ switch (type)
+ {
+${
+for c in composites:
+ name = cname(c)
+ if c.name in ("command", "control"):
+ out(fragment)
+} default:
+ throw new IllegalArgumentException("type: " + type);
+ }
+ }
+
+}
diff --git a/java/common/Type.tpl b/java/common/Type.tpl
new file mode 100644
index 0000000000..c869934538
--- /dev/null
+++ b/java/common/Type.tpl
@@ -0,0 +1,63 @@
+package org.apache.qpidity.transport;
+
+${from genutil import *}
+
+public enum Type
+{
+
+${
+types = spec.query["amqp/type"] + spec.query["amqp/class/type"]
+codes = {}
+first = True
+for t in types:
+ code = t["@code"]
+ fix_width = t["@fixed-width"]
+ var_width = t["@variable-width"]
+
+ if code is None:
+ continue
+
+ if fix_width is None:
+ width = var_width
+ fixed = "false"
+ else:
+ width = fix_width
+ fixed = "true"
+
+ name = scream(t["@name"])
+ codes[code] = name
+
+ if first:
+ first = False
+ else:
+ out(",\n")
+
+ out(" $name((byte) $code, $width, $fixed)")
+};
+
+ public byte code;
+ public int width;
+ public boolean fixed;
+
+ Type(byte code, int width, boolean fixed)
+ {
+ this.code = code;
+ this.width = width;
+ this.fixed = fixed;
+ }
+
+ public static Type get(byte code)
+ {
+ switch (code)
+ {
+${
+keys = list(codes.keys())
+keys.sort()
+
+for code in keys:
+ out(" case (byte) $code: return $(codes[code]);\n")
+}
+ default: return null;
+ }
+ }
+}
diff --git a/java/common/build.xml b/java/common/build.xml
index 6c8bfbaf0f..796385eff3 100644
--- a/java/common/build.xml
+++ b/java/common/build.xml
@@ -34,7 +34,7 @@
<target name="check_jython_deps">
<uptodate property="jython.notRequired" targetfile="${jython.timestamp}">
- <srcfiles dir="${xml.spec.dir}" includes="amqp.0-10-preview.xml" />
+ <srcfiles dir="${xml.spec.dir}" includes="amqp.0-10-qpid-errata.xml" />
</uptodate>
</target>
@@ -42,10 +42,9 @@
<java classname="org.python.util.jython" fork="true" failonerror="true">
<arg value="-Dpython.cachedir.skip=true"/>
<arg value="-Dpython.path=${basedir}/jython-lib.jar/Lib${path.separator}${mllib.dir}"/>
- <arg value="${basedir}/generate"/>
+ <arg value="${basedir}/codegen"/>
<arg value="${module.precompiled}"/>
- <arg value="org.apache.qpidity.transport"/>
- <arg value="${xml.spec.dir}/amqp.0-10-preview.xml"/>
+ <arg value="${xml.spec.dir}/amqp.0-10-qpid-errata.xml"/>
<classpath>
<pathelement location="jython-2.2-rc2.jar"/>
</classpath>
diff --git a/java/common/codegen b/java/common/codegen
new file mode 100755
index 0000000000..f5d1577774
--- /dev/null
+++ b/java/common/codegen
@@ -0,0 +1,63 @@
+#!/usr/bin/env python
+
+import os, sys, mllib
+from templating import Parser
+from genutil import *
+
+out_dir = sys.argv[1]
+spec_file = sys.argv[2]
+pkg_dir = os.path.join(out_dir, "org/apache/qpidity/transport")
+
+if not os.path.exists(pkg_dir):
+ os.makedirs(pkg_dir)
+
+spec = mllib.xml_parse(spec_file)
+
+def excludes(nd):
+ if (nd.parent is not None and
+ nd.parent.name == "class" and
+ nd.parent["@name"] in ("file", "stream")):
+ return False
+ else:
+ return True
+
+def execute(output, template, **kwargs):
+ f = open(template)
+ input = f.read()
+ f.close()
+ p = Parser(**kwargs)
+ p.parse(input)
+ fname = os.path.join(pkg_dir, output)
+ f = open(fname, "w")
+ f.write(p.output)
+ f.close()
+
+execute("Type.java", "Type.tpl", spec = spec)
+execute("Constant.java", "Constant.tpl", spec = spec)
+
+structs = spec.query["amqp/struct"] + \
+ spec.query["amqp/class/struct", excludes] + \
+ spec.query["amqp/class/command/result/struct", excludes]
+controls = spec.query["amqp/class/control", excludes]
+commands = spec.query["amqp/class/command", excludes]
+
+composites = structs + controls + commands
+
+for c in composites:
+ name = cname(c)
+ execute("%s.java" % name, "Composite.tpl", type = c, name = name)
+
+execute("MethodDelegate.java", "MethodDelegate.tpl", composites = composites)
+execute("Option.java", "Option.tpl", composites = composites)
+execute("Invoker.java", "Invoker.tpl", composites = controls + commands)
+execute("StructFactory.java", "StructFactory.tpl", composites = composites)
+
+def is_enum(nd):
+ return nd["enum"] is not None
+
+enums = spec.query["amqp/domain", is_enum] + \
+ spec.query["amqp/class/domain", is_enum]
+
+for e in enums:
+ name = cname(e)
+ execute("%s.java" % name, "Enum.tpl", name = name, type = e)
diff --git a/java/common/generate b/java/common/generate
deleted file mode 100755
index daf8475c54..0000000000
--- a/java/common/generate
+++ /dev/null
@@ -1,567 +0,0 @@
-#!/usr/bin/python
-
-# Interim code generation script.
-
-import sys, os, mllib
-from cStringIO import StringIO
-
-out_dir=sys.argv[1]
-out_pkg = sys.argv[2]
-spec_file = sys.argv[3]
-
-spec = mllib.xml_parse(spec_file)
-
-def jbool(b):
- if b:
- return "true"
- else:
- return "false"
-
-class Output:
-
- def __init__(self, dir, package, name):
- self.dir = dir
- self.package = package
- self.name = name
- self.lines = []
-
- self.line("package %s;" % self.package)
- self.line()
- self.line("import java.util.ArrayList;")
- self.line("import java.util.List;")
- self.line("import java.util.Map;")
- self.line("import java.util.UUID;")
- self.line()
- self.line("import org.apache.qpidity.transport.codec.Decoder;")
- self.line("import org.apache.qpidity.transport.codec.Encodable;")
- self.line("import org.apache.qpidity.transport.codec.Encoder;")
- self.line()
- self.line("import org.apache.qpidity.transport.network.Frame;")
- self.line()
- self.line()
-
- def line(self, l = ""):
- self.lines.append(l)
-
- def getter(self, type, method, value, pre = None):
- self.line()
- self.line(" public final %s %s() {" % (type, method))
- if pre:
- self.line(" %s;" % pre)
- self.line(" return %s;" % value)
- self.line(" }")
-
- def setter(self, type, method, variable, value = None, pre = None,
- post = None):
- if value:
- params = ""
- else:
- params = "%s value" % type
- value = "value"
-
- self.line()
- self.line(" public final %s %s(%s) {" % (self.name, method, params))
- if pre:
- self.line(" %s;" % pre)
- self.line(" this.%s = %s;" % (variable, value))
- if post:
- self.line(" %s;" % post)
- self.line(" return this;")
- self.line(" }")
-
- def write(self):
- dir = os.path.join(self.dir, *self.package.split("."))
- if not os.path.exists(dir):
- os.makedirs(dir)
- file = os.path.join(dir, "%s.java" % self.name)
- out = open(file, "w")
- for l in self.lines:
- out.write(l)
- out.write(os.linesep)
- out.close()
-
-TYPES = {
- "longstr": "String",
- "shortstr": "String",
- "longlong": "long",
- "long": "long",
- "short": "int",
- "octet": "short",
- "bit": "boolean",
- "table": "Map<String,Object>",
- "timestamp": "long",
- "content": "String",
- "uuid": "UUID",
- "rfc1982-long-set": "RangeSet",
- "long-struct": "Struct",
- "signed-byte": "byte",
- "unsigned-byte": "short",
- "char": "char",
- "boolean": "boolean",
- "two-octets": "short",
- "signed-short": "short",
- "unsigned-short": "int",
- "four-octets": "int",
- "signed-int": "int",
- "unsigned-int": "long",
- "float": "float",
- "utf32-char": "char",
- "eight-octets": "long",
- "signed-long": "long",
- "unsigned-long": "long",
- "double": "double",
- "datetime": "long",
- "sixteen-octets": "byte[]",
- "thirty-two-octets": "byte[]",
- "sixty-four-octets": "byte[]",
- "_128-octets": "byte[]",
- "short-binary": "byte[]",
- "short-string": "String",
- "short-utf8-string": "String",
- "short-utf16-string": "String",
- "short-utf32-string": "String",
- "binary": "byte[]",
- "string": "String",
- "utf8-string": "String",
- "utf16-string": "String",
- "utf32-string": "String",
- "long-binary": "byte[]",
- "long-string": "String",
- "long-utf8-string": "String",
- "long-utf16-string": "String",
- "long-utf32-string": "String",
- "sequence": "List<Object>",
- "array": "List<Object>",
- "five-octets": "byte[]",
- "decimal": "byte[]",
- "nine-octets": "byte[]",
- "long-decimal": "byte[]",
- "void": "Void"
- }
-
-DEFAULTS = {
- "longlong": "0",
- "long": "0",
- "short": "0",
- "octet": "0",
- "timestamp": "0",
- "bit": "false"
- }
-
-TRACKS = {
- "connection": "Frame.L1",
- "session": "Frame.L2",
- "execution": "Frame.L3",
- None: None
- }
-
-def camel(offset, *args):
- parts = []
- for a in args:
- parts.extend(a.split("-"))
- return "".join(parts[:offset] + [p[0].upper() + p[1:] for p in parts[offset:]])
-
-def dromedary(s):
- return s[0].lower() + s[1:]
-
-def scream(*args):
- return "_".join([a.replace("-", "_").upper() for a in args])
-
-
-types = Output(out_dir, out_pkg, "Type")
-types.line("public enum Type")
-types.line("{")
-codes = {}
-for c in spec.query["amqp/constant"]:
- if c["@class"] == "field-table-type":
- name = c["@name"]
- if name.startswith("field-table-"):
- name = name[12:]
- if name[0].isdigit():
- name = "_" + name
- val = c["@value"]
- codes[val] = name
- if c["@width"] != None:
- width = c["@width"]
- fixed = "true"
- if c["@lfwidth"] != None:
- width = c["@lfwidth"]
- fixed = "false"
- types.line(" %s((byte) %s, %s, %s)," %
- (scream(name), val, width, fixed))
-types.line(" ;")
-
-types.line(" public byte code;")
-types.line(" public int width;")
-types.line(" public boolean fixed;")
-
-types.line(" Type(byte code, int width, boolean fixed)")
-types.line(" {")
-for arg in ("code", "width", "fixed"):
- types.line(" this.%s = %s;" % (arg, arg))
-types.line(" }")
-
-types.line(" public static Type get(byte code)")
-types.line(" {")
-types.line(" switch (code)")
-types.line(" {")
-for code, name in codes.items():
- types.line(" case (byte) %s: return %s;" % (code, scream(name)))
-types.line(" default: return null;")
-types.line(" }")
-types.line(" }")
-
-types.line("}")
-types.write()
-
-
-const = Output(out_dir, out_pkg, "Constant")
-const.line("public interface Constant")
-const.line("{")
-for d in spec.query["amqp/constant"]:
- name = d["@name"]
- val = d["@value"]
- datatype = d["@datatype"]
- if datatype == None:
- const.line("public static final int %s = %s;" % (scream(name), val))
-const.line("}")
-const.write()
-
-
-DOMAINS = {}
-STRUCTS = {}
-
-for d in spec.query["amqp/domain"]:
- name = d["@name"]
- type = d["@type"]
- if type != None:
- DOMAINS[name] = d["@type"]
- elif d["struct"] != None:
- DOMAINS[name] = name
- STRUCTS[name] = camel(0, name)
-
-def resolve(type):
- if DOMAINS.has_key(type) and DOMAINS[type] != type:
- return resolve(DOMAINS[type])
- else:
- return type
-
-def jtype(type):
- if STRUCTS.has_key(type):
- return STRUCTS[type]
- else:
- return TYPES[type]
-
-def jclass(jt):
- idx = jt.find('<')
- if idx > 0:
- return jt[:idx]
- else:
- return jt
-
-REFS = {
- "boolean": "Boolean",
- "byte": "Byte",
- "short": "Short",
- "int": "Integer",
- "long": "Long",
- "float": "Float",
- "double": "Double",
- "char": "Character"
-}
-
-def jref(jt):
- return REFS.get(jt, jt)
-
-
-OPTIONS = {}
-
-class Struct:
-
- def __init__(self, node, name, base, type, size, pack, track, content):
- self.node = node
- self.name = name
- self.base = base
- self.type = type
- self.size = size
- self.pack = pack
- self.track = track
- self.content = content
- self.fields = []
- self.ticket = False
-
- def result(self):
- r = self.node["result"]
- if not r: return
- name = r["@domain"]
- if not name:
- name = self.name + "Result"
- else:
- name = camel(0, name)
- return name
-
- def field(self, type, name):
- if name == "ticket":
- self.ticket = True
- else:
- self.fields.append((type, name))
-
- def impl(self, out):
- out.line("public class %s extends %s {" % (self.name, self.base))
-
- out.line()
- out.line(" public static final int TYPE = %d;" % self.type)
- out.getter("int", "getStructType", "TYPE")
- out.getter("int", "getSizeWidth", self.size)
- out.getter("int", "getPackWidth", self.pack)
- out.getter("boolean", "hasTicket", jbool(self.ticket))
-
- if self.base == "Method":
- out.getter("boolean", "hasPayload", jbool(self.content))
- out.getter("byte", "getEncodedTrack", self.track)
-
- out.line()
- out.line(" private static final List<Field<?,?>> FIELDS = new ArrayList<Field<?,?>>();")
- out.line(" public List<Field<?,?>> getFields() { return FIELDS; }")
- out.line()
-
- out.line()
- for type, name in self.fields:
- out.line(" private boolean has_%s;" % name)
- out.line(" private %s %s;" % (jtype(type), name))
-
- if self.fields:
- out.line()
- out.line(" public %s() {}" % self.name)
-
- out.line()
- out.line(" public %s(%s) {" % (self.name, self.parameters()))
- opts = False
- for type, name in self.fields:
- if not OPTIONS.has_key(name):
- out.line(" %s(%s);" % (camel(1, "set", name), name))
- else:
- opts = True
- if opts:
- for type, name in self.fields:
- if OPTIONS.has_key(name):
- out.line(" boolean _%s = false;" % name)
- out.line(" for (int i=0; i < _options.length; i++) {")
- out.line(" switch (_options[i]) {")
- for type, name in self.fields:
- if OPTIONS.has_key(name):
- out.line(" case %s: _%s=true; break;" % (OPTIONS[name], name))
- out.line(" case NO_OPTION: break;")
- out.line(' default: throw new IllegalArgumentException'
- '("invalid option: " + _options[i]);')
- out.line(" }")
- out.line(" }")
- for type, name in self.fields:
- if OPTIONS.has_key(name):
- out.line(" %s(_%s);" % (camel(1, "set", name), name))
- out.line(" }")
-
- out.line()
- out.line(" public <C> void dispatch(C context, MethodDelegate<C> delegate) {")
- out.line(" delegate.%s(context, this);" % dromedary(self.name))
- out.line(" }")
-
- index = 0
- for type, name in self.fields:
- out.getter("boolean", camel(1, "has", name), "has_" + name)
- out.setter("boolean", camel(1, "clear", name), "has_" + name, "false",
- post = "this.%s = %s; this.dirty = true" % (name, DEFAULTS.get(type, "null")))
- out.getter(jtype(type), camel(1, "get", name), name)
- for mname in (camel(1, "set", name), name):
- out.setter(jtype(type), mname, name,
- post = "this.has_%s = true; this.dirty = true" % name)
-
- out.line()
- out.line(' static {')
- ftype = jref(jclass(jtype(type)))
- out.line(' FIELDS.add(new Field<%s,%s>(%s.class, %s.class, "%s", %d) {' %
- (self.name, ftype, self.name, ftype, name, index))
- out.line(' public boolean has(Object struct) {')
- out.line(' return check(struct).has_%s;' % name)
- out.line(' }')
- out.line(' public void has(Object struct, boolean value) {')
- out.line(' check(struct).has_%s = value;' % name)
- out.line(' }')
- out.line(' public %s get(Object struct) {' % ftype)
- out.line(' return check(struct).%s();' % camel(1, "get", name))
- out.line(' }')
- out.line(' public void read(Decoder dec, Object struct) {')
- if TYPES.has_key(type):
- out.line(' check(struct).%s = dec.read%s();' % (name, camel(0, type)))
- elif STRUCTS.has_key(type):
- out.line(' check(struct).%s = (%s) dec.readStruct(%s.TYPE);' %
- (name, STRUCTS[type], STRUCTS[type]))
- else:
- raise Exception("unknown type: %s" % type)
- out.line(' check(struct).dirty = true;')
- out.line(' }')
- out.line(' public void write(Encoder enc, Object struct) {')
- if TYPES.has_key(type):
- out.line(' enc.write%s(check(struct).%s);' % (camel(0, type), name))
- elif STRUCTS.has_key(type):
- out.line(' enc.writeStruct(%s.TYPE, check(struct).%s);' %
- (STRUCTS[type], name))
- else:
- raise Exception("unknown type: %s" % type)
- out.line(' }')
- out.line(' });')
- out.line(' }')
- index += 1;
-
- out.line("}")
-
-
- def parameters(self):
- params = []
- var = False
- for type, name in self.fields:
- if OPTIONS.has_key(name):
- var = True
- else:
- params.append("%s %s" % (jtype(type), name))
- if var:
- params.append("Option ... _options")
- return ", ".join(params)
-
- def arguments(self):
- args = []
- var = False
- for type, name in self.fields:
- if OPTIONS.has_key(name):
- var = True
- else:
- args.append(name)
- if var:
- args.append("_options")
- return ", ".join(args)
-
-CLASSES = {"file": False, "basic": False, "stream": False, "tunnel": False}
-
-PACK_WIDTHS = {
- None: 2,
- "octet": 1,
- "short": 2,
- "long": 4
- }
-
-SIZE_WIDTHS = PACK_WIDTHS.copy()
-SIZE_WIDTHS[None] = 0
-
-class Visitor(mllib.transforms.Visitor):
-
- def __init__(self):
- self.structs = []
- self.untyped = -1
-
- def do_method(self, m):
- if CLASSES.get(m.parent["@name"], True):
- name = camel(0, m.parent["@name"], m["@name"])
- type = int(m.parent["@index"])*256 + int(m["@index"])
- self.structs.append((name, "Method", type, 0, 2, m))
- self.descend(m)
-
- def do_domain(self, d):
- s = d["struct"]
- if s:
- name = camel(0, d["@name"])
- st = s["@type"]
- if st in (None, "none", ""):
- type = self.untyped
- self.untyped -= 1
- else:
- type = int(st)
- self.structs.append((name, "Struct", type, SIZE_WIDTHS[s["@size"]],
- PACK_WIDTHS[s["@pack"]], s))
- self.descend(d)
-
- def do_result(self, r):
- s = r["struct"]
- if s:
- name = camel(0, r.parent.parent["@name"], r.parent["@name"], "Result")
- type = int(r.parent.parent["@index"]) * 256 + int(s["@type"])
- self.structs.append((name, "Result", type, SIZE_WIDTHS[s["@size"]],
- PACK_WIDTHS[s["@pack"]], s))
- self.descend(r)
-
-v = Visitor()
-spec.dispatch(v)
-
-opts = Output(out_dir, out_pkg, "Option")
-opts.line("public enum Option {")
-structs = []
-for name, base, typecode, size, pack, m in v.structs:
- struct = Struct(m, name, base, typecode, size, pack,
- TRACKS.get(m.parent["@name"], "Frame.L4"),
- m["@content"] == "1")
- for f in m.query["field"]:
- type = resolve(f["@domain"])
- name = camel(1, f["@name"])
- struct.field(type, name)
- if type == "bit":
- opt_name = scream(f["@name"])
- if not OPTIONS.has_key(name):
- OPTIONS[name] = opt_name
- opts.line(" %s," % opt_name)
- structs.append(struct)
-opts.line(" %s," % "NO_OPTION")
-opts.line("}")
-opts.write()
-
-
-
-
-for s in structs:
- impl = Output(out_dir, out_pkg, s.name)
- s.impl(impl)
- impl.write()
-
-fct = Output(out_dir, out_pkg, "StructFactory")
-fct.line("class StructFactory {")
-fct.line(" public static Struct create(int type) {")
-fct.line(" switch (type) {")
-for s in structs:
- fct.line(" case %s.TYPE:" % s.name)
- fct.line(" return new %s();" % s.name)
-fct.line(" default:")
-fct.line(' throw new IllegalArgumentException("type: " + type);')
-fct.line(" }")
-fct.line(" }")
-fct.line("}");
-fct.write()
-
-dlg = Output(out_dir, out_pkg, "MethodDelegate")
-dlg.line("public abstract class MethodDelegate<C> {")
-for s in structs:
- dlg.line(" public void %s(C context, %s struct) {}" %
- (dromedary(s.name), s.name))
-dlg.line("}")
-dlg.write()
-
-inv = Output(out_dir, out_pkg, "Invoker")
-inv.line("public abstract class Invoker {")
-inv.line()
-inv.line(" protected abstract void invoke(Method method);")
-inv.line(" protected abstract <T> Future<T> invoke(Method method, Class<T> resultClass);")
-inv.line()
-for s in structs:
- if s.base != "Method": continue
- dname = dromedary(s.name)
- result = s.result()
- if result:
- result_type = "Future<%s>" % result
- else:
- result_type = "void"
- inv.line(" public %s %s(%s) {" % (result_type, dname, s.parameters()))
- if result:
- inv.line(" return invoke(new %s(%s), %s.class);" %
- (s.name, s.arguments(), result))
- else:
- inv.line(" invoke(new %s(%s));" % (s.name, s.arguments()))
- inv.line(" }")
-inv.line("}")
-inv.write()
diff --git a/java/common/genutil.py b/java/common/genutil.py
new file mode 100644
index 0000000000..5206b50bbd
--- /dev/null
+++ b/java/common/genutil.py
@@ -0,0 +1,207 @@
+
+def camel(offset, *args):
+ parts = []
+ for a in args:
+ parts.extend(a.split("-"))
+ return "".join(parts[:offset] + [p[0].upper() + p[1:] for p in parts[offset:]])
+
+def dromedary(s):
+ return s[0].lower() + s[1:]
+
+def scream(*args):
+ return "_".join([a.replace("-", "_").upper() for a in args])
+
+def num(x):
+ if x is not None and x != "":
+ return int(x, 0)
+ else:
+ return None
+
+def klass(nd):
+ parent = nd.parent
+ while parent is not None:
+ if hasattr(parent, "name") and parent.name == "class":
+ return parent
+ parent = parent.parent
+
+untyped = -1
+
+def code(nd):
+ global untyped
+ cd = num(nd["@code"])
+ if cd is None:
+ cd = untyped
+ untyped -= 1
+ return cd
+
+ cls = klass(nd)
+ if cls:
+ cd |= (num(cls["@code"]) << 8)
+ return cd
+
+def root(nd):
+ if nd.parent is None:
+ return nd
+ else:
+ return root(nd.parent)
+
+def qname(nd):
+ name = nd["@name"]
+ cls = klass(nd)
+ if cls != None:
+ return "%s.%s" % (cls["@name"], name)
+ else:
+ return name
+
+def resolve(node, name):
+ spec = root(node)
+ cls = klass(node)
+ if cls:
+ for nd in cls.query["#tag"]:
+ if nd["@name"] == name:
+ return nd
+ for nd in spec.query["amqp/#tag"] + spec.query["amqp/class/#tag"]:
+ if name == qname(nd):
+ return nd
+ raise Exception("unresolved name: %s" % name)
+
+def resolve_type(nd):
+ name = nd["@type"]
+ type = resolve(nd, name)
+ if type.name == "domain" and not type["enum"]:
+ return resolve_type(type)
+ else:
+ return type
+
+TYPES = {
+ "bit": "boolean",
+ "uint8": "short",
+ "uint16": "int",
+ "uint32": "long",
+ "uint64": "long",
+ "datetime": "long",
+ "uuid": "UUID",
+ "sequence-no": "int",
+ "sequence-set": "RangeSet", # XXX
+ "byte-ranges": "RangeSet", # XXX
+ "str8": "String",
+ "str16": "String",
+ "vbin8": "byte[]",
+ "vbin16": "byte[]",
+ "vbin32": "byte[]",
+ "struct32": "Struct",
+ "map": "Map<String,Object>",
+ "array": "List<Object>"
+ }
+
+def cname(nd, field="@name"):
+ cls = klass(nd)
+ if cls:
+ if (nd.name in ("struct", "result") and
+ cls["@name"] != "session" and
+ nd[field] != "header"):
+ return camel(0, nd[field])
+ else:
+ return camel(0, cls["@name"], nd[field])
+ else:
+ return camel(0, nd[field])
+
+def jtype(nd):
+ if nd.name == "struct" or nd["enum"]:
+ return cname(nd)
+ else:
+ return TYPES[nd["@name"]]
+
+REFS = {
+ "boolean": "Boolean",
+ "byte": "Byte",
+ "short": "Short",
+ "int": "Integer",
+ "long": "Long",
+ "float": "Float",
+ "double": "Double",
+ "char": "Character"
+}
+
+def jref(jt):
+ return REFS.get(jt, jt)
+
+def jclass(jt):
+ idx = jt.find('<')
+ if idx > 0:
+ return jt[:idx]
+ else:
+ return jt
+
+DEFAULTS = {
+ "long": 0,
+ "int": 0,
+ "short": 0,
+ "byte": 0,
+ "char": 0,
+ "boolean": "false"
+ }
+
+class Field:
+
+ def __init__(self, index, nd):
+ self.index = index
+ self.name = camel(1, nd["@name"])
+ type_node = resolve_type(nd)
+ tname = cname(type_node)
+ if type_node.name == "struct":
+ self.read = "(%s) dec.readStruct(%s.TYPE)" % (tname, tname)
+ self.write = "enc.writeStruct(%s.TYPE, check(struct).%s)" % (tname, self.name)
+ elif type_node.name == "domain":
+ coder = camel(0, resolve_type(type_node)["@name"])
+ self.read = "%s.get(dec.read%s())" % (tname, coder)
+ self.write = "enc.write%s(check(struct).%s.getValue())" % (coder, self.name)
+ else:
+ coder = camel(0, type_node["@name"])
+ self.read = "dec.read%s()" % coder
+ self.write = "enc.write%s(check(struct).%s)" % (coder, self.name)
+ self.type = jtype(type_node)
+ self.default = DEFAULTS.get(self.type, "null")
+ self.has = camel(1, "has", self.name)
+ self.get = camel(1, "get", self.name)
+ self.set = camel(1, "set", self.name)
+ self.clear = camel(1, "clear", self.name)
+ if self.type == "boolean":
+ self.option = scream(nd["@name"])
+ else:
+ self.option = None
+
+def get_fields(nd):
+ fields = []
+ index = 0
+ for f in nd.query["field"]:
+ fields.append(Field(index, f))
+ index += 1
+ return fields
+
+def get_parameters(fields):
+ params = []
+ options = False
+ for f in fields:
+ if f.option:
+ options = True
+ else:
+ params.append("%s %s" % (f.type, f.name))
+ if options:
+ params.append("Option ... _options")
+ return params
+
+def get_arguments(fields):
+ args = []
+ options = False
+ for f in fields:
+ if f.option:
+ options = True
+ else:
+ args.append(f.name)
+ if options:
+ args.append("_options")
+ return args
+
+def get_options(fields):
+ return [f for f in fields if f.option]
diff --git a/java/common/src/main/java/org/apache/qpidity/SecurityHelper.java b/java/common/src/main/java/org/apache/qpidity/SecurityHelper.java
index d0b3272dec..a72997813a 100644
--- a/java/common/src/main/java/org/apache/qpidity/SecurityHelper.java
+++ b/java/common/src/main/java/org/apache/qpidity/SecurityHelper.java
@@ -22,6 +22,7 @@ package org.apache.qpidity;
import java.io.UnsupportedEncodingException;
import java.util.HashSet;
+import java.util.List;
import java.util.StringTokenizer;
import org.apache.qpidity.security.AMQPCallbackHandler;
@@ -29,13 +30,12 @@ import org.apache.qpidity.security.CallbackHandlerRegistry;
public class SecurityHelper
{
- public static String chooseMechanism(String mechanisms) throws UnsupportedEncodingException
+ public static String chooseMechanism(List<Object> mechanisms) throws UnsupportedEncodingException
{
- StringTokenizer tokenizer = new StringTokenizer(mechanisms, " ");
HashSet mechanismSet = new HashSet();
- while (tokenizer.hasMoreTokens())
+ for (Object m : mechanisms)
{
- mechanismSet.add(tokenizer.nextToken());
+ mechanismSet.add(m);
}
String preferredMechanisms = CallbackHandlerRegistry.getInstance().getMechanisms();
diff --git a/java/common/src/main/java/org/apache/qpidity/ToyBroker.java b/java/common/src/main/java/org/apache/qpidity/ToyBroker.java
index 2bd97f3aff..5f9917e30a 100644
--- a/java/common/src/main/java/org/apache/qpidity/ToyBroker.java
+++ b/java/common/src/main/java/org/apache/qpidity/ToyBroker.java
@@ -59,7 +59,7 @@ class ToyBroker extends SessionDelegate
public void messageAcquire(Session context, MessageAcquire struct)
{
System.out.println("\n==================> messageAcquire " );
- context.messageAcquired(struct.getTransfers());
+ context.executionResult((int) struct.getId(), new Acquired(struct.getTransfers()));
}
@Override public void queueDeclare(Session ssn, QueueDeclare qd)
@@ -68,16 +68,16 @@ class ToyBroker extends SessionDelegate
System.out.println("\n==================> declared queue: " + qd.getQueue() + "\n");
}
- @Override public void queueBind(Session ssn, QueueBind qb)
+ @Override public void exchangeBind(Session ssn, ExchangeBind qb)
{
- exchange.bindQueue(qb.getExchange(), qb.getRoutingKey(),qb.getQueue());
- System.out.println("\n==================> bound queue: " + qb.getQueue() + " with routing key " + qb.getRoutingKey() + "\n");
+ exchange.bindQueue(qb.getExchange(), qb.getBindingKey(),qb.getQueue());
+ System.out.println("\n==================> bound queue: " + qb.getQueue() + " with binding key " + qb.getBindingKey() + "\n");
}
@Override public void queueQuery(Session ssn, QueueQuery qq)
{
QueueQueryResult result = new QueueQueryResult().queue(qq.getQueue());
- ssn.executionResult(qq.getId(), result);
+ ssn.executionResult((int) qq.getId(), result);
}
@Override public void messageSubscribe(Session ssn, MessageSubscribe ms)
@@ -112,7 +112,8 @@ class ToyBroker extends SessionDelegate
{
if (xfr == null || body == null)
{
- ssn.connectionClose(503, "no method segment", 0, 0);
+ ssn.connectionClose(ConnectionCloseCode.FRAMING_ERROR,
+ "no method segment");
ssn.close();
return;
}
@@ -136,7 +137,7 @@ class ToyBroker extends SessionDelegate
{
if (xfr == null || body == null)
{
- ssn.connectionClose(503, "no method segment", 0, 0);
+ ssn.connectionClose(ConnectionCloseCode.FRAMING_ERROR, "no method segment");
ssn.close();
return;
}
@@ -174,14 +175,16 @@ class ToyBroker extends SessionDelegate
{
RangeSet ranges = new RangeSet();
ranges.add(xfr.getId());
- ssn.messageReject(ranges, 0, "no such destination");
+ ssn.messageReject(ranges, MessageRejectCode.UNROUTABLE,
+ "no such destination");
}
}
private void transferMessageToPeer(Session ssn,String dest, Message m)
{
System.out.println("\n==================> Transfering message to: " +dest + "\n");
- ssn.messageTransfer(dest, (short)0, (short)0);
+ ssn.messageTransfer(dest, MessageAcceptMode.EXPLICIT,
+ MessageAcquireMode.PRE_ACQUIRED);
ssn.header(m.header);
for (Data d : m.body)
{
diff --git a/java/common/src/main/java/org/apache/qpidity/ToyClient.java b/java/common/src/main/java/org/apache/qpidity/ToyClient.java
index 10b68bbb20..a3233afcbe 100644
--- a/java/common/src/main/java/org/apache/qpidity/ToyClient.java
+++ b/java/common/src/main/java/org/apache/qpidity/ToyClient.java
@@ -63,7 +63,7 @@ class ToyClient extends SessionDelegate
public static final void main(String[] args)
{
Connection conn = MinaHandler.connect("0.0.0.0", 5672,
- new ConnectionDelegate()
+ new ClientDelegate()
{
public SessionDelegate getSessionDelegate()
{
@@ -80,9 +80,9 @@ class ToyClient extends SessionDelegate
TransportConstants.getVersionMinor())));
Channel ch = conn.getChannel(0);
- Session ssn = new Session();
+ Session ssn = new Session("my-session".getBytes());
ssn.attach(ch);
- ssn.sessionOpen(1234);
+ ssn.sessionAttach(ssn.getName());
ssn.queueDeclare("asdf", null, null);
ssn.sync();
@@ -111,13 +111,15 @@ class ToyClient extends SessionDelegate
map.put("list", Arrays.asList(1, 2, 3));
map.put("binary", new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10});
- ssn.messageTransfer("asdf", (short) 0, (short) 1);
+ ssn.messageTransfer("asdf", MessageAcceptMode.EXPLICIT,
+ MessageAcquireMode.PRE_ACQUIRED);
ssn.header(new DeliveryProperties(),
new MessageProperties().setApplicationHeaders(map));
ssn.data("this is the data");
ssn.endData();
- ssn.messageTransfer("fdsa", (short) 0, (short) 1);
+ ssn.messageTransfer("fdsa", MessageAcceptMode.EXPLICIT,
+ MessageAcquireMode.PRE_ACQUIRED);
ssn.data("this should be rejected");
ssn.endData();
ssn.sync();
diff --git a/java/common/src/main/java/org/apache/qpidity/dtx/XidImpl.java b/java/common/src/main/java/org/apache/qpidity/dtx/XidImpl.java
index 224917ade1..89d7e7917f 100644
--- a/java/common/src/main/java/org/apache/qpidity/dtx/XidImpl.java
+++ b/java/common/src/main/java/org/apache/qpidity/dtx/XidImpl.java
@@ -241,28 +241,10 @@ public class XidImpl implements Xid
* @return The String representation of this Xid
* @throws QpidException In case of problem when converting this Xid into a string.
*/
- public static String convertToString(Xid xid) throws QpidException
+ public static org.apache.qpidity.transport.Xid convert(Xid xid) throws QpidException
{
- if (_logger.isDebugEnabled())
- {
- _logger.debug("converting " + xid + " into a String");
- }
- try
- {
- ByteArrayOutputStream res = new ByteArrayOutputStream();
- DataOutputStream out = new DataOutputStream(res);
- out.writeLong(xid.getFormatId());
- byte[] txId = xid.getGlobalTransactionId();
- byte[] brId = xid.getBranchQualifier();
- out.writeByte(txId.length);
- out.writeByte(brId.length);
- out.write(txId);
- out.write(brId);
- return res.toString();
- }
- catch (IOException e)
- {
- throw new QpidException("cannot convert the xid " + xid + " into a String", null, e);
- }
+ return new org.apache.qpidity.transport.Xid(xid.getFormatId(),
+ xid.getGlobalTransactionId(),
+ xid.getBranchQualifier());
}
}
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Channel.java b/java/common/src/main/java/org/apache/qpidity/transport/Channel.java
index 7327697088..651402bcb7 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/Channel.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/Channel.java
@@ -77,7 +77,7 @@ public class Channel extends Invoker
connection.getConnectionDelegate().init(this, hdr);
}
- public void method(Void v, Method method)
+ public void control(Void v, Method method)
{
switch (method.getEncodedTrack())
{
@@ -90,15 +90,17 @@ public class Channel extends Invoker
case L3:
method.delegate(session, sessionDelegate);
break;
- case L4:
- method.delegate(session, sessionDelegate);
- break;
default:
throw new IllegalStateException
("unknown track: " + method.getEncodedTrack());
}
}
+ public void command(Void v, Method method)
+ {
+ method.delegate(session, sessionDelegate);
+ }
+
public void header(Void v, Header header)
{
header.delegate(session, sessionDelegate);
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/ChannelDelegate.java b/java/common/src/main/java/org/apache/qpidity/transport/ChannelDelegate.java
index 84621fbe25..7f4365f515 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/ChannelDelegate.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/ChannelDelegate.java
@@ -32,15 +32,14 @@ import java.util.UUID;
class ChannelDelegate extends MethodDelegate<Channel>
{
- public @Override void sessionOpen(Channel channel, SessionOpen open)
+ public @Override void sessionAttach(Channel channel, SessionAttach atch)
{
- Session ssn = new Session();
+ Session ssn = new Session(atch.getName());
ssn.attach(channel);
- long lifetime = open.getDetachedLifetime();
- ssn.sessionAttached(UUID.randomUUID(), lifetime);
+ ssn.sessionAttached(ssn.getName());
}
- public @Override void sessionClosed(Channel channel, SessionClosed closed)
+ public @Override void sessionDetached(Channel channel, SessionDetached closed)
{
channel.getSession().closed();
// XXX: should we remove the channel from the connection? It
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/ClientDelegate.java b/java/common/src/main/java/org/apache/qpidity/transport/ClientDelegate.java
new file mode 100644
index 0000000000..699854fb3b
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpidity/transport/ClientDelegate.java
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpidity.transport;
+
+
+/**
+ * ClientDelegate
+ *
+ */
+
+public abstract class ClientDelegate extends ConnectionDelegate
+{
+
+ public void init(Channel ch, ProtocolHeader hdr)
+ {
+ if (hdr.getMajor() != TransportConstants.getVersionMajor() &&
+ hdr.getMinor() != TransportConstants.getVersionMinor())
+ {
+ throw new RuntimeException("version missmatch: " + hdr);
+ }
+ }
+
+}
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java b/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java
index 4815f1025f..cb5f05a185 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java
@@ -26,7 +26,10 @@ import org.apache.qpidity.SecurityHelper;
import org.apache.qpidity.QpidException;
import java.io.UnsupportedEncodingException;
+
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
@@ -79,17 +82,27 @@ public abstract class ConnectionDelegate extends MethodDelegate<Channel>
public void init(Channel ch, ProtocolHeader hdr)
{
- // XXX: hardcoded version
- if (hdr.getMajor() != 0 && hdr.getMinor() != 10)
+ ch.getConnection().send(new ConnectionEvent(0, new ProtocolHeader
+ (1,
+ TransportConstants.getVersionMajor(),
+ TransportConstants.getVersionMinor())));
+ if (hdr.getMajor() != TransportConstants.getVersionMajor() &&
+ hdr.getMinor() != TransportConstants.getVersionMinor())
{
// XXX
- ch.getConnection().send(new ConnectionEvent(0, new ProtocolHeader(1, TransportConstants.getVersionMajor(),
- TransportConstants.getVersionMinor())));
+ ch.getConnection().send(new ConnectionEvent(0, new ProtocolHeader
+ (1,
+ TransportConstants.getVersionMajor(),
+ TransportConstants.getVersionMinor())));
ch.getConnection().close();
}
else
{
- ch.connectionStart(hdr.getMajor(), hdr.getMinor(), null, "PLAIN", "utf8");
+ List<Object> plain = new ArrayList<Object>();
+ plain.add("PLAIN");
+ List<Object> utf8 = new ArrayList<Object>();
+ utf8.add("utf8");
+ ch.connectionStart(null, plain, utf8);
}
}
@@ -99,13 +112,13 @@ public abstract class ConnectionDelegate extends MethodDelegate<Channel>
@Override public void connectionStart(Channel context, ConnectionStart struct)
{
String mechanism = null;
- String response = null;
+ byte[] response = null;
try
{
mechanism = SecurityHelper.chooseMechanism(struct.getMechanisms());
saslClient = Sasl.createSaslClient(new String[]{ mechanism },null, "AMQP", "localhost", null,
SecurityHelper.createCallbackHandler(mechanism,_username,_password ));
- response = new String(saslClient.evaluateChallenge(new byte[0]),_locale);
+ response = saslClient.evaluateChallenge(new byte[0]);
}
catch (UnsupportedEncodingException e)
{
@@ -128,13 +141,9 @@ public abstract class ConnectionDelegate extends MethodDelegate<Channel>
{
try
{
- String response = new String(saslClient.evaluateChallenge(struct.getChallenge().getBytes()),_locale);
+ byte[] response = saslClient.evaluateChallenge(struct.getChallenge());
context.connectionSecureOk(response);
}
- catch (UnsupportedEncodingException e)
- {
- // need error handling
- }
catch (SaslException e)
{
// need error handling
@@ -144,14 +153,14 @@ public abstract class ConnectionDelegate extends MethodDelegate<Channel>
@Override public void connectionTune(Channel context, ConnectionTune struct)
{
// should update the channel max given by the broker.
- context.connectionTuneOk(struct.getChannelMax(), struct.getFrameMax(), struct.getHeartbeat());
+ context.connectionTuneOk(struct.getChannelMax(), struct.getMaxFrameSize(), struct.getHeartbeatMax());
context.connectionOpen(_virtualHost, null, Option.INSIST);
}
@Override public void connectionOpenOk(Channel context, ConnectionOpenOk struct)
{
- String knownHosts = struct.getKnownHosts();
+ List<Object> knownHosts = struct.getKnownHosts();
if(_negotiationCompleteLock != null)
{
_negotiationCompleteLock.lock();
@@ -187,13 +196,13 @@ public abstract class ConnectionDelegate extends MethodDelegate<Channel>
byte[] challenge = null;
if ( challenge == null)
{
- context.connectionTune(Integer.MAX_VALUE,maxFrame, 0);
+ context.connectionTune(Integer.MAX_VALUE, maxFrame, 0, Integer.MAX_VALUE);
}
else
{
try
{
- context.connectionSecure(new String(challenge,_locale));
+ context.connectionSecure(challenge);
}
catch(Exception e)
{
@@ -218,16 +227,16 @@ public abstract class ConnectionDelegate extends MethodDelegate<Channel>
try
{
saslServer = Sasl.createSaslServer(_mechanism, "AMQP", "ABC",new HashMap(),SecurityHelper.createCallbackHandler(_mechanism,_username,_password));
- byte[] challenge = saslServer.evaluateResponse(struct.getResponse().getBytes());
+ byte[] challenge = saslServer.evaluateResponse(struct.getResponse());
if ( challenge == null)
{
- context.connectionTune(Integer.MAX_VALUE,maxFrame, 0);
+ context.connectionTune(Integer.MAX_VALUE, maxFrame, 0, Integer.MAX_VALUE);
}
else
{
try
{
- context.connectionSecure(new String(challenge,_locale));
+ context.connectionSecure(challenge);
}
catch(Exception e)
{
@@ -250,8 +259,9 @@ public abstract class ConnectionDelegate extends MethodDelegate<Channel>
@Override public void connectionOpen(Channel context, ConnectionOpen struct)
{
- String hosts = "amqp:1223243232325";
- context.connectionOpenOk(hosts);
+ List<Object> hosts = new ArrayList<Object>();
+ hosts.add("amqp:1223243232325");
+ context.connectionOpenOk(hosts);
}
public String getPassword()
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Data.java b/java/common/src/main/java/org/apache/qpidity/transport/Data.java
index b9b8636d18..72c1c3331d 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/Data.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/Data.java
@@ -93,7 +93,7 @@ public class Data implements ProtocolEvent
{
str.append(" | ");
}
- str.append(str(buf, 20));
+ str.append(str(buf));
}
str.append(")");
return str.toString();
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Method.java b/java/common/src/main/java/org/apache/qpidity/transport/Method.java
index 7304260333..bc4ec6289d 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/Method.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/Method.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpidity.transport;
+import org.apache.qpidity.transport.network.Frame;
/**
* Method
@@ -34,11 +35,12 @@ public abstract class Method extends Struct implements ProtocolEvent
{
// XXX: should generate separate factories for separate
// namespaces
- return (Method) Struct.create(type);
+ return (Method) StructFactory.createInstruction(type);
}
// XXX: command subclass?
private long id;
+ private boolean sync = false;
public final long getId()
{
@@ -50,6 +52,16 @@ public abstract class Method extends Struct implements ProtocolEvent
this.id = id;
}
+ public final boolean isSync()
+ {
+ return sync;
+ }
+
+ void setSync(boolean value)
+ {
+ this.sync = value;
+ }
+
public abstract boolean hasPayload();
public abstract byte getEncodedTrack();
@@ -58,7 +70,14 @@ public abstract class Method extends Struct implements ProtocolEvent
public <C> void delegate(C context, ProtocolDelegate<C> delegate)
{
- delegate.method(context, this);
+ if (getEncodedTrack() == Frame.L4)
+ {
+ delegate.command(context, this);
+ }
+ else
+ {
+ delegate.control(context, this);
+ }
}
}
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/ProtocolDelegate.java b/java/common/src/main/java/org/apache/qpidity/transport/ProtocolDelegate.java
index 6149c3876a..028d570416 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/ProtocolDelegate.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/ProtocolDelegate.java
@@ -31,7 +31,9 @@ public interface ProtocolDelegate<C>
void init(C context, ProtocolHeader header);
- void method(C context, Method method);
+ void control(C context, Method control);
+
+ void command(C context, Method command);
void header(C context, Header header);
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/RangeSet.java b/java/common/src/main/java/org/apache/qpidity/transport/RangeSet.java
index 97d4be0d2e..ccbcfa99d0 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/RangeSet.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/RangeSet.java
@@ -98,6 +98,13 @@ public class RangeSet implements Iterable<Range>
ranges.clear();
}
+ public RangeSet copy()
+ {
+ RangeSet copy = new RangeSet();
+ copy.ranges.addAll(ranges);
+ return copy;
+ }
+
public String toString()
{
StringBuffer str = new StringBuffer();
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Session.java b/java/common/src/main/java/org/apache/qpidity/transport/Session.java
index e4f4af95a5..f6f4062c6d 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/Session.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/Session.java
@@ -32,6 +32,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
+import static org.apache.qpidity.transport.Option.*;
/**
* Session
@@ -56,23 +57,34 @@ public class Session extends Invoker
private static boolean ENABLE_REPLAY = false;
private static final Logger log = Logger.get(Session.class);
+ private byte[] name;
+ private long timeout = 60000;
+
// channel may be null
Channel channel;
// incoming command count
- private long commandsIn = 0;
+ long commandsIn = 0;
// completed incoming commands
private final RangeSet processed = new RangeSet();
- private long processedMark = -1;
private Range syncPoint = null;
// outgoing command count
private long commandsOut = 0;
private Map<Long,Method> commands = new HashMap<Long,Method>();
- private long mark = 0;
+ private long maxComplete = -1;
private AtomicBoolean closed = new AtomicBoolean(false);
+ public Session(byte[] name)
+ {
+ this.name = name;
+ }
+
+ public byte[] getName()
+ {
+ return name;
+ }
public Map<Long,Method> getOutstandingCommands()
{
@@ -133,25 +145,12 @@ public class Session extends Invoker
public void flushProcessed()
{
- boolean first = true;
- RangeSet rest = new RangeSet();
+ RangeSet copy;
synchronized (processed)
{
- for (Range r: processed)
- {
- if (first && r.includes(processedMark))
- {
- processedMark = r.getUpper();
- }
- else
- {
- rest.add(r);
- }
-
- first = false;
- }
+ copy = processed.copy();
}
- executionComplete(processedMark, rest);
+ sessionCompleted(copy);
}
void syncPoint()
@@ -193,34 +192,34 @@ public class Session extends Invoker
log.debug("%s complete(%d, %d)", this, lower, upper);
synchronized (commands)
{
- for (long id = lower; id <= upper; id++)
+ for (long id = maxComplete; id <= upper; id++)
{
commands.remove(id);
}
+ if (lower <= maxComplete + 1)
+ {
+ maxComplete = Math.max(maxComplete, upper);
+ }
commands.notifyAll();
log.debug("%s commands remaining: %s", this, commands);
}
}
- void complete(long mark)
- {
- synchronized (commands)
- {
- complete(this.mark, mark);
- this.mark = mark;
- commands.notifyAll();
- }
- }
-
protected void invoke(Method m)
{
if (m.getEncodedTrack() == Frame.L4)
{
synchronized (commands)
{
- // You only need to keep the command if you need command level replay.
- // If not we only need to keep track of commands to make sync work
- commands.put(commandsOut++,(ENABLE_REPLAY?m:null));
+ long next = commandsOut++;
+ if (next == 0)
+ {
+ sessionCommandPoint(0, 0);
+ }
+ if (ENABLE_REPLAY)
+ {
+ commands.put(next, m);
+ }
channel.method(m);
}
}
@@ -230,7 +229,7 @@ public class Session extends Invoker
}
}
- public void header(Header header)
+ public void header(Header header)
{
channel.header(header);
}
@@ -269,21 +268,32 @@ public class Session extends Invoker
public void sync()
{
+ sync(timeout);
+ }
+
+ public void sync(long timeout)
+ {
log.debug("%s sync()", this);
synchronized (commands)
{
long point = commandsOut - 1;
- if (mark < point)
+ if (maxComplete < point)
{
- executionSync();
+ ExecutionSync sync = new ExecutionSync();
+ sync.setSync(true);
+ invoke(sync);
}
- while (!closed.get() && mark < point)
+ long start = System.currentTimeMillis();
+ long elapsed = 0;
+ while (!closed.get() && elapsed < timeout && maxComplete < point)
{
try {
- log.debug("%s waiting for[%d]: %s", this, point, commands);
- commands.wait();
+ log.debug("%s waiting for[%d]: %d, %s", this, point,
+ maxComplete, commands);
+ commands.wait(timeout - elapsed);
+ elapsed = System.currentTimeMillis() - start;
}
catch (InterruptedException e)
{
@@ -291,9 +301,16 @@ public class Session extends Invoker
}
}
- if (mark < point)
+ if (maxComplete < point)
{
- throw new RuntimeException("session closed");
+ if (closed.get())
+ {
+ throw new RuntimeException("session closed");
+ }
+ else
+ {
+ throw new RuntimeException("timed out waiting for sync");
+ }
}
}
}
@@ -346,16 +363,19 @@ public class Session extends Invoker
}
}
- public T get(long timeout, int nanos)
+ public T get(long timeout)
{
synchronized (this)
{
- while (!closed.get() && !isDone())
+ long start = System.currentTimeMillis();
+ long elapsed = 0;
+ while (!closed.get() && timeout - elapsed > 0 && !isDone())
{
try
{
log.debug("%s waiting for result: %s", Session.this, this);
- wait(timeout, nanos);
+ wait(timeout - elapsed);
+ elapsed = System.currentTimeMillis() - start;
}
catch (InterruptedException e)
{
@@ -364,22 +384,23 @@ public class Session extends Invoker
}
}
- if (!isDone())
+ if (isDone())
+ {
+ return result;
+ }
+ else if (closed.get())
{
throw new RuntimeException("session closed");
}
-
- return result;
- }
-
- public T get(long timeout)
- {
- return get(timeout, 0);
+ else
+ {
+ return null;
+ }
}
public T get()
{
- return get(0);
+ return get(timeout);
}
public boolean isDone()
@@ -396,7 +417,8 @@ public class Session extends Invoker
public void close()
{
- sessionClose();
+ sessionRequestTimeout(0);
+ sessionDetach(name);
// XXX: channel.close();
}
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/SessionDelegate.java b/java/common/src/main/java/org/apache/qpidity/transport/SessionDelegate.java
index 5e36a77a98..442aba0e9b 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/SessionDelegate.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/SessionDelegate.java
@@ -35,20 +35,16 @@ public abstract class SessionDelegate
{
public void init(Session ssn, ProtocolHeader hdr) { }
- public void method(Session ssn, Method method) {
- if (method.getEncodedTrack() == Frame.L4)
- {
- method.setId(ssn.nextCommandId());
- }
-
+ public void control(Session ssn, Method method) {
method.dispatch(ssn, this);
+ }
- if (method.getEncodedTrack() == Frame.L4)
+ public void command(Session ssn, Method method) {
+ method.setId(ssn.nextCommandId());
+ method.dispatch(ssn, this);
+ if (!method.hasPayload())
{
- if (!method.hasPayload())
- {
- ssn.processed(method);
- }
+ ssn.processed(method);
}
}
@@ -60,12 +56,12 @@ public abstract class SessionDelegate
@Override public void executionResult(Session ssn, ExecutionResult result)
{
- ssn.result(result.getCommandId(), result.getData());
+ ssn.result(result.getCommandId(), result.getValue());
}
- @Override public void executionComplete(Session ssn, ExecutionComplete excmp)
+ @Override public void sessionCompleted(Session ssn, SessionCompleted cmp)
{
- RangeSet ranges = excmp.getRangedExecutionSet();
+ RangeSet ranges = cmp.getCommands();
if (ranges != null)
{
for (Range range : ranges)
@@ -73,12 +69,27 @@ public abstract class SessionDelegate
ssn.complete(range.getLower(), range.getUpper());
}
}
- ssn.complete(excmp.getCumulativeExecutionMark());
}
- @Override public void executionFlush(Session ssn, ExecutionFlush flush)
+ @Override public void sessionFlush(Session ssn, SessionFlush flush)
+ {
+ if (flush.getCompleted())
+ {
+ ssn.flushProcessed();
+ }
+ if (flush.getConfirmed())
+ {
+ throw new Error("not implemented");
+ }
+ if (flush.getExpected())
+ {
+ throw new Error("not implemented");
+ }
+ }
+
+ @Override public void sessionCommandPoint(Session ssn, SessionCommandPoint scp)
{
- ssn.flushProcessed();
+ ssn.commandsIn = scp.getCommandId();
}
@Override public void executionSync(Session ssn, ExecutionSync sync)
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Struct.java b/java/common/src/main/java/org/apache/qpidity/transport/Struct.java
index 200c3b68e3..f901f9e840 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/Struct.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/Struct.java
@@ -71,8 +71,6 @@ public abstract class Struct implements Encodable
return type;
}
- public abstract boolean hasTicket();
-
private final boolean isBit(Field<?,?> f)
{
return f.getType().equals(Boolean.class);
@@ -80,14 +78,7 @@ public abstract class Struct implements Encodable
private final boolean packed()
{
- if (this instanceof Method)
- {
- return false;
- }
- else
- {
- return true;
- }
+ return getPackWidth() > 0;
}
private final boolean encoded(Field<?,?> f)
@@ -147,11 +138,6 @@ public abstract class Struct implements Encodable
}
}
- if (hasTicket())
- {
- dec.readShort();
- }
-
for (Field<?,?> f : fields)
{
if (encoded(f))
@@ -187,11 +173,6 @@ public abstract class Struct implements Encodable
}
}
- if (hasTicket())
- {
- enc.writeShort(0x0);
- }
-
for (Field<?,?> f : fields)
{
if (encoded(f))
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/TransportConstants.java b/java/common/src/main/java/org/apache/qpidity/transport/TransportConstants.java
index 54429a1a4f..e9a0705de0 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/TransportConstants.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/TransportConstants.java
@@ -2,8 +2,9 @@ package org.apache.qpidity.transport;
public class TransportConstants
{
- private static byte _protocol_version_minor = 0;
- private static byte _protocol_version_major = 99;
+
+ private static byte _protocol_version_minor = 10;
+ private static byte _protocol_version_major = 0;
public static void setVersionMajor(byte value)
{
@@ -24,4 +25,5 @@ public class TransportConstants
{
return _protocol_version_minor;
}
+
}
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java b/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java
index ae67483a23..0f6180f54a 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java
@@ -20,7 +20,10 @@
*/
package org.apache.qpidity.transport.codec;
+import java.io.UnsupportedEncodingException;
+
import java.util.ArrayList;
+import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -99,19 +102,19 @@ abstract class AbstractDecoder implements Decoder
nbits = 0;
}
- public short readOctet()
+ public short readUint8()
{
return uget();
}
- public int readShort()
+ public int readUint16()
{
int i = uget() << 8;
i |= uget();
return i;
}
- public long readLong()
+ public long readUint32()
{
long l = uget() << 24;
l |= uget() << 16;
@@ -120,7 +123,12 @@ abstract class AbstractDecoder implements Decoder
return l;
}
- public long readLonglong()
+ public int readSequenceNo()
+ {
+ return (int) readUint32();
+ }
+
+ public long readUint64()
{
long l = 0;
for (int i = 0; i < 8; i++)
@@ -130,31 +138,67 @@ abstract class AbstractDecoder implements Decoder
return l;
}
- public long readTimestamp()
+ public long readDatetime()
+ {
+ return readUint64();
+ }
+
+ private static final String decode(byte[] bytes, String charset)
{
- return readLonglong();
+ try
+ {
+ return new String(bytes, charset);
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ throw new RuntimeException(e);
+ }
}
- public String readShortstr()
+ public String readStr8()
+ {
+ short size = readUint8();
+ byte[] bytes = new byte[size];
+ get(bytes);
+ return decode(bytes, "UTF-8");
+ }
+
+ public String readStr16()
{
- short size = readOctet();
+ int size = readUint16();
byte[] bytes = new byte[size];
get(bytes);
- return new String(bytes);
+ return decode(bytes, "UTF-8");
}
- public String readLongstr()
+ public byte[] readVbin8()
{
- long size = readLong();
- byte[] bytes = new byte[(int) size];
+ int size = readUint8();
+ byte[] bytes = new byte[size];
get(bytes);
- return new String(bytes);
+ return bytes;
}
- public RangeSet readRfc1982LongSet()
+ public byte[] readVbin16()
{
- int count = readShort()/8;
+ int size = readUint16();
+ byte[] bytes = new byte[size];
+ get(bytes);
+ return bytes;
+ }
+
+ public byte[] readVbin32()
+ {
+ int size = (int) readUint32();
+ byte[] bytes = new byte[size];
+ get(bytes);
+ return bytes;
+ }
+
+ public RangeSet readSequenceSet()
+ {
+ int count = readUint16()/8;
if (count == 0)
{
return null;
@@ -164,16 +208,21 @@ abstract class AbstractDecoder implements Decoder
RangeSet ranges = new RangeSet();
for (int i = 0; i < count; i++)
{
- ranges.add(readLong(), readLong());
+ ranges.add(readUint32(), readUint32());
}
return ranges;
}
}
+ public RangeSet readByteRanges()
+ {
+ throw new Error("not implemented");
+ }
+
public UUID readUuid()
{
- long msb = readLonglong();
- long lsb = readLonglong();
+ long msb = readUint64();
+ long lsb = readUint64();
return new UUID(msb, lsb);
}
@@ -194,34 +243,39 @@ abstract class AbstractDecoder implements Decoder
return null;
}
}
+ if (type > 0)
+ {
+ int code = readUint16();
+ assert code == type;
+ }
st.read(this);
return st;
}
- public Struct readLongStruct()
+ public Struct readStruct32()
{
- long size = readLong();
+ long size = readUint32();
if (size == 0)
{
return null;
}
else
{
- int type = readShort();
+ int type = readUint16();
Struct result = Struct.create(type);
result.read(this);
return result;
}
}
- public Map<String,Object> readTable()
+ public Map<String,Object> readMap()
{
- long size = readLong();
+ long size = readUint32();
int start = count;
Map<String,Object> result = new LinkedHashMap();
while (count < start + size)
{
- String key = readShortstr();
+ String key = readStr8();
byte code = get();
Type t = getType(code);
Object value = read(t);
@@ -230,9 +284,9 @@ abstract class AbstractDecoder implements Decoder
return result;
}
- public List<Object> readSequence()
+ public List<Object> readList()
{
- long size = readLong();
+ long size = readUint32();
int start = count;
List<Object> result = new ArrayList();
while (count < start + size)
@@ -247,10 +301,15 @@ abstract class AbstractDecoder implements Decoder
public List<Object> readArray()
{
- long size = readLong();
+ long size = readUint32();
+ if (size == 0)
+ {
+ return Collections.EMPTY_LIST;
+ }
+
byte code = get();
Type t = getType(code);
- long count = readLong();
+ long count = readUint32();
List<Object> result = new ArrayList<Object>();
for (int i = 0; i < count; i++)
@@ -291,11 +350,11 @@ abstract class AbstractDecoder implements Decoder
switch (width)
{
case 1:
- return readOctet();
+ return readUint8();
case 2:
- return readShort();
+ return readUint16();
case 4:
- return readLong();
+ return readUint32();
default:
throw new IllegalStateException("illegal width: " + width);
}
@@ -313,81 +372,72 @@ abstract class AbstractDecoder implements Decoder
{
switch (t)
{
- case OCTET:
- case UNSIGNED_BYTE:
- return readOctet();
- case SIGNED_BYTE:
+ case BIN8:
+ case UINT8:
+ return readUint8();
+ case INT8:
return get();
case CHAR:
return (char) get();
case BOOLEAN:
return get() > 0;
- case TWO_OCTETS:
- case UNSIGNED_SHORT:
- return readShort();
+ case BIN16:
+ case UINT16:
+ return readUint16();
- case SIGNED_SHORT:
- return (short) readShort();
+ case INT16:
+ return (short) readUint16();
- case FOUR_OCTETS:
- case UNSIGNED_INT:
- return readLong();
+ case BIN32:
+ case UINT32:
+ return readUint32();
- case UTF32_CHAR:
- case SIGNED_INT:
- return (int) readLong();
+ case CHAR_UTF32:
+ case INT32:
+ return (int) readUint32();
case FLOAT:
- return Float.intBitsToFloat((int) readLong());
+ return Float.intBitsToFloat((int) readUint32());
- case EIGHT_OCTETS:
- case SIGNED_LONG:
- case UNSIGNED_LONG:
+ case BIN64:
+ case UINT64:
+ case INT64:
case DATETIME:
- return readLonglong();
+ return readUint64();
case DOUBLE:
- return Double.longBitsToDouble(readLonglong());
-
- case SIXTEEN_OCTETS:
- case THIRTY_TWO_OCTETS:
- case SIXTY_FOUR_OCTETS:
- case _128_OCTETS:
- case SHORT_BINARY:
- case BINARY:
- case LONG_BINARY:
- return readBytes(t);
+ return Double.longBitsToDouble(readUint64());
case UUID:
return readUuid();
- case SHORT_STRING:
- case SHORT_UTF8_STRING:
- case SHORT_UTF16_STRING:
- case SHORT_UTF32_STRING:
- case STRING:
- case UTF8_STRING:
- case UTF16_STRING:
- case UTF32_STRING:
- case LONG_STRING:
- case LONG_UTF8_STRING:
- case LONG_UTF16_STRING:
- case LONG_UTF32_STRING:
+ case STR8:
+ return readStr8();
+
+ case STR16:
+ return readStr16();
+
+ case STR8_LATIN:
+ case STR8_UTF16:
+ case STR16_LATIN:
+ case STR16_UTF16:
// XXX: need to do character conversion
return new String(readBytes(t));
- case TABLE:
- return readTable();
- case SEQUENCE:
- return readSequence();
+ case MAP:
+ return readMap();
+ case LIST:
+ return readList();
case ARRAY:
return readArray();
+ case STRUCT32:
+ return readStruct32();
- case FIVE_OCTETS:
- case DECIMAL:
- case NINE_OCTETS:
- case LONG_DECIMAL:
+ case BIN40:
+ case DEC32:
+ case BIN72:
+ case DEC64:
// XXX: what types are we supposed to use here?
return readBytes(t);
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java b/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java
index c2dd205d66..56b4537719 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java
@@ -20,8 +20,11 @@
*/
package org.apache.qpidity.transport.codec;
+import java.io.UnsupportedEncodingException;
+
import java.nio.ByteBuffer;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -48,17 +51,17 @@ abstract class AbstractEncoder implements Encoder
static
{
ENCODINGS.put(Boolean.class, Type.BOOLEAN);
- ENCODINGS.put(String.class, Type.LONG_STRING);
- ENCODINGS.put(Long.class, Type.SIGNED_LONG);
- ENCODINGS.put(Integer.class, Type.SIGNED_INT);
- ENCODINGS.put(Short.class, Type.SIGNED_SHORT);
- ENCODINGS.put(Byte.class, Type.SIGNED_BYTE);
- ENCODINGS.put(Map.class, Type.TABLE);
- ENCODINGS.put(List.class, Type.SEQUENCE);
+ ENCODINGS.put(String.class, Type.STR16);
+ ENCODINGS.put(Long.class, Type.INT64);
+ ENCODINGS.put(Integer.class, Type.INT32);
+ ENCODINGS.put(Short.class, Type.INT16);
+ ENCODINGS.put(Byte.class, Type.INT8);
+ ENCODINGS.put(Map.class, Type.MAP);
+ ENCODINGS.put(List.class, Type.LIST);
ENCODINGS.put(Float.class, Type.FLOAT);
ENCODINGS.put(Double.class, Type.DOUBLE);
ENCODINGS.put(Character.class, Type.CHAR);
- ENCODINGS.put(byte[].class, Type.LONG_BINARY);
+ ENCODINGS.put(byte[].class, Type.VBIN32);
}
protected Sizer sizer()
@@ -120,14 +123,14 @@ abstract class AbstractEncoder implements Encoder
flushBits();
}
- public void writeOctet(short b)
+ public void writeUint8(short b)
{
assert b < 0x100;
put((byte) b);
}
- public void writeShort(int s)
+ public void writeUint16(int s)
{
assert s < 0x10000;
@@ -135,7 +138,7 @@ abstract class AbstractEncoder implements Encoder
put(lsb(s));
}
- public void writeLong(long i)
+ public void writeUint32(long i)
{
assert i < 0x100000000L;
@@ -145,7 +148,12 @@ abstract class AbstractEncoder implements Encoder
put(lsb(i));
}
- public void writeLonglong(long l)
+ public void writeSequenceNo(int i)
+ {
+ writeUint32(i);
+ }
+
+ public void writeUint64(long l)
{
for (int i = 0; i < 8; i++)
{
@@ -154,47 +162,101 @@ abstract class AbstractEncoder implements Encoder
}
- public void writeTimestamp(long l)
+ public void writeDatetime(long l)
+ {
+ writeUint64(l);
+ }
+
+ private static final String checkLength(String s, int n)
+ {
+ if (s == null)
+ {
+ return "";
+ }
+
+ if (s.length() > n)
+ {
+ throw new IllegalArgumentException("string too long: " + s);
+ }
+ else
+ {
+ return s;
+ }
+ }
+
+ private static final byte[] encode(String s, String charset)
+ {
+ try
+ {
+ return s.getBytes(charset);
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void writeStr8(String s)
{
- writeLonglong(l);
+ s = checkLength(s, 255);
+ writeUint8((short) s.length());
+ put(ByteBuffer.wrap(encode(s, "UTF-8")));
}
+ public void writeStr16(String s)
+ {
+ s = checkLength(s, 65535);
+ writeUint16(s.length());
+ put(ByteBuffer.wrap(encode(s, "UTF-8")));
+ }
- public void writeShortstr(String s)
+ public void writeVbin8(byte[] bytes)
{
- if (s == null) { s = ""; }
- if (s.length() > 255) {
- throw new IllegalArgumentException(s);
+ if (bytes == null) { bytes = new byte[0]; }
+ if (bytes.length > 255)
+ {
+ throw new IllegalArgumentException("array too long: " + bytes.length);
}
- writeOctet((short) s.length());
- put(ByteBuffer.wrap(s.getBytes()));
+ writeUint8((short) bytes.length);
+ put(ByteBuffer.wrap(bytes));
}
- public void writeLongstr(String s)
+ public void writeVbin16(byte[] bytes)
{
- if (s == null) { s = ""; }
- writeLong(s.length());
- put(ByteBuffer.wrap(s.getBytes()));
+ if (bytes == null) { bytes = new byte[0]; }
+ writeUint16(bytes.length);
+ put(ByteBuffer.wrap(bytes));
}
+ public void writeVbin32(byte[] bytes)
+ {
+ if (bytes == null) { bytes = new byte[0]; }
+ writeUint32(bytes.length);
+ put(ByteBuffer.wrap(bytes));
+ }
- public void writeRfc1982LongSet(RangeSet ranges)
+ public void writeSequenceSet(RangeSet ranges)
{
if (ranges == null)
{
- writeShort((short) 0);
+ writeUint16((short) 0);
}
else
{
- writeShort(ranges.size() * 8);
+ writeUint16(ranges.size() * 8);
for (Range range : ranges)
{
- writeLong(range.getLower());
- writeLong(range.getUpper());
+ writeUint32(range.getLower());
+ writeUint32(range.getUpper());
}
}
}
+ public void writeByteRanges(RangeSet ranges)
+ {
+ throw new Error("not implemented");
+ }
+
public void writeUuid(UUID uuid)
{
long msb = 0;
@@ -202,15 +264,10 @@ abstract class AbstractEncoder implements Encoder
if (uuid != null)
{
msb = uuid.getMostSignificantBits();
- uuid.getLeastSignificantBits();
+ lsb = uuid.getLeastSignificantBits();
}
- writeLonglong(msb);
- writeLonglong(lsb);
- }
-
- public void writeContent(String c)
- {
- throw new Error("Deprecated");
+ writeUint64(msb);
+ writeUint64(lsb);
}
public void writeStruct(int type, Struct s)
@@ -237,22 +294,27 @@ abstract class AbstractEncoder implements Encoder
}
}
+ if (type > 0)
+ {
+ writeUint16(type);
+ }
+
s.write(this);
}
- public void writeLongStruct(Struct s)
+ public void writeStruct32(Struct s)
{
if (s == null)
{
- writeLong(0);
+ writeUint32(0);
}
else
{
Sizer sizer = sizer();
- sizer.writeShort(s.getEncodedType());
+ sizer.writeUint16(s.getEncodedType());
s.write(sizer);
- writeLong(sizer.size());
- writeShort(s.getEncodedType());
+ writeUint32(sizer.size());
+ writeUint16(s.getEncodedType());
s.write(this);
}
}
@@ -309,46 +371,46 @@ abstract class AbstractEncoder implements Encoder
return null;
}
- public void writeTable(Map<String,Object> table)
+ public void writeMap(Map<String,Object> map)
{
- if (table == null)
+ if (map == null)
{
- writeLong(0);
+ writeUint32(0);
return;
}
Sizer sizer = sizer();
- sizer.writeTable(table);
+ sizer.writeMap(map);
// XXX: - 4
- writeLong(sizer.size() - 4);
- writeTableEntries(table);
+ writeUint32(sizer.size() - 4);
+ writeMapEntries(map);
}
- protected void writeTableEntries(Map<String,Object> table)
+ protected void writeMapEntries(Map<String,Object> map)
{
- for (Map.Entry<String,Object> entry : table.entrySet())
+ for (Map.Entry<String,Object> entry : map.entrySet())
{
String key = entry.getKey();
Object value = entry.getValue();
Type type = encoding(value);
- writeShortstr(key);
+ writeStr8(key);
put(type.code);
write(type, value);
}
}
- public void writeSequence(List<Object> sequence)
+ public void writeList(List<Object> list)
{
Sizer sizer = sizer();
- sizer.writeSequence(sequence);
+ sizer.writeList(list);
// XXX: - 4
- writeLong(sizer.size() - 4);
- writeSequenceEntries(sequence);
+ writeUint32(sizer.size() - 4);
+ writeListEntries(list);
}
- protected void writeSequenceEntries(List<Object> sequence)
+ protected void writeListEntries(List<Object> list)
{
- for (Object value : sequence)
+ for (Object value : list)
{
Type type = encoding(value);
put(type.code);
@@ -358,10 +420,15 @@ abstract class AbstractEncoder implements Encoder
public void writeArray(List<Object> array)
{
+ if (array == null)
+ {
+ array = Collections.EMPTY_LIST;
+ }
+
Sizer sizer = sizer();
sizer.writeArray(array);
// XXX: -4
- writeLong(sizer.size() - 4);
+ writeUint32(sizer.size() - 4);
writeArrayEntries(array);
}
@@ -371,7 +438,7 @@ abstract class AbstractEncoder implements Encoder
if (array.isEmpty())
{
- type = Type.VOID;
+ return;
}
else
{
@@ -380,6 +447,8 @@ abstract class AbstractEncoder implements Encoder
put(type.code);
+ writeUint32(array.size());
+
for (Object value : array)
{
write(type, value);
@@ -409,13 +478,13 @@ abstract class AbstractEncoder implements Encoder
switch (width)
{
case 1:
- writeOctet((short) size);
+ writeUint8((short) size);
break;
case 2:
- writeShort(size);
+ writeUint16(size);
break;
case 4:
- writeLong(size);
+ writeUint32(size);
break;
default:
throw new IllegalStateException("illegal width: " + width);
@@ -444,11 +513,11 @@ abstract class AbstractEncoder implements Encoder
{
switch (t)
{
- case OCTET:
- case UNSIGNED_BYTE:
- writeOctet(coerce(Short.class, value));
+ case BIN8:
+ case UINT8:
+ writeUint8(coerce(Short.class, value));
break;
- case SIGNED_BYTE:
+ case INT8:
put(coerce(Byte.class, value));
break;
case CHAR:
@@ -465,85 +534,78 @@ abstract class AbstractEncoder implements Encoder
}
break;
- case TWO_OCTETS:
- case UNSIGNED_SHORT:
- writeShort(coerce(Integer.class, value));
+ case BIN16:
+ case UINT16:
+ writeUint16(coerce(Integer.class, value));
break;
- case SIGNED_SHORT:
- writeShort(coerce(Short.class, value));
+ case INT16:
+ writeUint16(coerce(Short.class, value));
break;
- case FOUR_OCTETS:
- case UNSIGNED_INT:
- writeLong(coerce(Long.class, value));
+ case BIN32:
+ case UINT32:
+ writeUint32(coerce(Long.class, value));
break;
- case UTF32_CHAR:
- case SIGNED_INT:
- writeLong(coerce(Integer.class, value));
+ case CHAR_UTF32:
+ case INT32:
+ writeUint32(coerce(Integer.class, value));
break;
case FLOAT:
- writeLong(Float.floatToIntBits(coerce(Float.class, value)));
+ writeUint32(Float.floatToIntBits(coerce(Float.class, value)));
break;
- case EIGHT_OCTETS:
- case SIGNED_LONG:
- case UNSIGNED_LONG:
+ case BIN64:
+ case UINT64:
+ case INT64:
case DATETIME:
- writeLonglong(coerce(Long.class, value));
+ writeUint64(coerce(Long.class, value));
break;
case DOUBLE:
long bits = Double.doubleToLongBits(coerce(Double.class, value));
- writeLonglong(bits);
- break;
-
- case SIXTEEN_OCTETS:
- case THIRTY_TWO_OCTETS:
- case SIXTY_FOUR_OCTETS:
- case _128_OCTETS:
- case SHORT_BINARY:
- case BINARY:
- case LONG_BINARY:
- writeBytes(t, coerce(byte[].class, value));
+ writeUint64(bits);
break;
case UUID:
writeUuid(coerce(UUID.class, value));
break;
- case SHORT_STRING:
- case SHORT_UTF8_STRING:
- case SHORT_UTF16_STRING:
- case SHORT_UTF32_STRING:
- case STRING:
- case UTF8_STRING:
- case UTF16_STRING:
- case UTF32_STRING:
- case LONG_STRING:
- case LONG_UTF8_STRING:
- case LONG_UTF16_STRING:
- case LONG_UTF32_STRING:
+ case STR8:
+ writeStr8(coerce(String.class, value));
+ break;
+
+ case STR16:
+ writeStr16(coerce(String.class, value));
+ break;
+
+ case STR8_LATIN:
+ case STR8_UTF16:
+ case STR16_LATIN:
+ case STR16_UTF16:
// XXX: need to do character conversion
writeBytes(t, coerce(String.class, value).getBytes());
break;
- case TABLE:
- writeTable((Map<String,Object>) coerce(Map.class, value));
+ case MAP:
+ writeMap((Map<String,Object>) coerce(Map.class, value));
break;
- case SEQUENCE:
- writeSequence(coerce(List.class, value));
+ case LIST:
+ writeList(coerce(List.class, value));
break;
case ARRAY:
writeArray(coerce(List.class, value));
break;
+ case STRUCT32:
+ writeStruct32(coerce(Struct.class, value));
+ break;
- case FIVE_OCTETS:
- case DECIMAL:
- case NINE_OCTETS:
- case LONG_DECIMAL:
+ case BIN40:
+ case DEC32:
+ case BIN72:
+ case DEC64:
// XXX: what types are we supposed to use here?
writeBytes(t, coerce(byte[].class, value));
break;
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/codec/Decoder.java b/java/common/src/main/java/org/apache/qpidity/transport/codec/Decoder.java
index f0738e0a91..df2d208118 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/codec/Decoder.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/codec/Decoder.java
@@ -38,26 +38,30 @@ public interface Decoder
{
boolean readBit();
- short readOctet();
- int readShort();
- long readLong();
- long readLonglong();
+ short readUint8();
+ int readUint16();
+ long readUint32();
+ long readUint64();
- long readTimestamp();
-
- String readShortstr();
- String readLongstr();
-
- RangeSet readRfc1982LongSet();
+ long readDatetime();
UUID readUuid();
- String readContent();
+ int readSequenceNo();
+ RangeSet readSequenceSet(); // XXX
+ RangeSet readByteRanges(); // XXX
- Struct readStruct(int type);
- Struct readLongStruct();
+ String readStr8();
+ String readStr16();
- Map<String,Object> readTable();
- List<Object> readSequence();
+ byte[] readVbin8();
+ byte[] readVbin16();
+ byte[] readVbin32();
+
+ Struct readStruct32();
+ Map<String,Object> readMap();
+ List<Object> readList();
List<Object> readArray();
+ Struct readStruct(int type);
+
}
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/codec/Encoder.java b/java/common/src/main/java/org/apache/qpidity/transport/codec/Encoder.java
index 1b2fe0213e..0449ba6702 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/codec/Encoder.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/codec/Encoder.java
@@ -40,26 +40,30 @@ public interface Encoder
void flush();
void writeBit(boolean b);
- void writeOctet(short b);
- void writeShort(int s);
- void writeLong(long i);
- void writeLonglong(long l);
+ void writeUint8(short b);
+ void writeUint16(int s);
+ void writeUint32(long i);
+ void writeUint64(long l);
- void writeTimestamp(long l);
-
- void writeShortstr(String s);
- void writeLongstr(String s);
-
- void writeRfc1982LongSet(RangeSet ranges);
+ void writeDatetime(long l);
void writeUuid(UUID uuid);
- void writeContent(String c);
+ void writeSequenceNo(int s);
+ void writeSequenceSet(RangeSet ranges); // XXX
+ void writeByteRanges(RangeSet ranges); // XXX
- void writeStruct(int type, Struct s);
- void writeLongStruct(Struct s);
+ void writeStr8(String s);
+ void writeStr16(String s);
- void writeTable(Map<String,Object> table);
- void writeSequence(List<Object> sequence);
+ void writeVbin8(byte[] bytes);
+ void writeVbin16(byte[] bytes);
+ void writeVbin32(byte[] bytes);
+
+ void writeStruct32(Struct s);
+ void writeMap(Map<String,Object> map);
+ void writeList(List<Object> list);
void writeArray(List<Object> array);
+ void writeStruct(int type, Struct s);
+
}
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/codec/Sizer.java b/java/common/src/main/java/org/apache/qpidity/transport/codec/Sizer.java
index b98bf98239..8ffdd5150b 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/codec/Sizer.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/codec/Sizer.java
@@ -38,30 +38,34 @@ public interface Sizer extends Encoder
public static final Sizer NULL = new Sizer()
{
- public void flush() {};
+ public void flush() {}
- public void writeBit(boolean b) {};
- public void writeOctet(short b) {};
- public void writeShort(int s) {};
- public void writeLong(long i) {};
- public void writeLonglong(long l) {};
+ public void writeBit(boolean b) {}
+ public void writeUint8(short b) {}
+ public void writeUint16(int s) {}
+ public void writeUint32(long i) {}
+ public void writeUint64(long l) {}
- public void writeTimestamp(long l) {};
+ public void writeDatetime(long l) {}
+ public void writeUuid(UUID uuid) {}
- public void writeShortstr(String s) {};
- public void writeLongstr(String s) {};
+ public void writeSequenceNo(int s) {}
+ public void writeSequenceSet(RangeSet ranges) {} // XXX
+ public void writeByteRanges(RangeSet ranges) {} // XXX
- public void writeRfc1982LongSet(RangeSet ranges) {};
- public void writeUuid(UUID uuid) {};
+ public void writeStr8(String s) {}
+ public void writeStr16(String s) {}
- public void writeContent(String c) {};
+ public void writeVbin8(byte[] bytes) {}
+ public void writeVbin16(byte[] bytes) {}
+ public void writeVbin32(byte[] bytes) {}
- public void writeStruct(int type, Struct s) {};
- public void writeLongStruct(Struct s) {};
+ public void writeStruct32(Struct s) {}
+ public void writeMap(Map<String,Object> map) {}
+ public void writeList(List<Object> list) {}
+ public void writeArray(List<Object> array) {}
- public void writeTable(Map<String,Object> table) {};
- public void writeSequence(List<Object> sequence) {};
- public void writeArray(List<Object> array) {};
+ public void writeStruct(int type, Struct s) {}
public int getSize() { return 0; }
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java b/java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java
index 5c35596fd8..d493245f0c 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java
@@ -37,6 +37,7 @@ import org.apache.qpidity.transport.ProtocolError;
import org.apache.qpidity.transport.ProtocolEvent;
import org.apache.qpidity.transport.ProtocolHeader;
import org.apache.qpidity.transport.Receiver;
+import org.apache.qpidity.transport.SegmentType;
import org.apache.qpidity.transport.Struct;
@@ -118,7 +119,7 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate
{
switch (frame.getType())
{
- case Frame.BODY:
+ case BODY:
emit(frame, new Data(frame, frame.isFirstFrame(),
frame.isLastFrame()));
break;
@@ -158,22 +159,29 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate
}
}
- private ProtocolEvent decode(Frame frame, byte type, List<ByteBuffer> segment)
+ private ProtocolEvent decode(Frame frame, SegmentType type, List<ByteBuffer> segment)
{
FragmentDecoder dec = new FragmentDecoder(segment.iterator());
switch (type)
{
- case Frame.METHOD:
- int methodType = dec.readShort();
- Method method = Method.create(methodType);
- method.read(dec);
- return method;
- case Frame.HEADER:
+ case CONTROL:
+ int controlType = dec.readUint16();
+ Method control = Method.create(controlType);
+ control.read(dec);
+ return control;
+ case COMMAND:
+ int commandType = dec.readUint16();
+ // read in the session header, right now we don't use it
+ dec.readUint16();
+ Method command = Method.create(commandType);
+ command.read(dec);
+ return command;
+ case HEADER:
List<Struct> structs = new ArrayList();
while (dec.hasRemaining())
{
- structs.add(dec.readLongStruct());
+ structs.add(dec.readStruct32());
}
return new Header(structs,frame.isLastFrame() && frame.isLastSegment());
default:
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/Disassembler.java b/java/common/src/main/java/org/apache/qpidity/transport/network/Disassembler.java
index d38d8ded98..0357df6e86 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/network/Disassembler.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/network/Disassembler.java
@@ -31,6 +31,7 @@ import org.apache.qpidity.transport.ProtocolDelegate;
import org.apache.qpidity.transport.ProtocolError;
import org.apache.qpidity.transport.ProtocolEvent;
import org.apache.qpidity.transport.ProtocolHeader;
+import org.apache.qpidity.transport.SegmentType;
import org.apache.qpidity.transport.Sender;
import org.apache.qpidity.transport.Struct;
@@ -76,50 +77,50 @@ public class Disassembler implements Sender<ConnectionEvent>,
sender.close();
}
- private void fragment(byte flags, byte type, ConnectionEvent event,
+ private void fragment(byte flags, SegmentType type, ConnectionEvent event,
ByteBuffer buf, boolean first, boolean last)
{
if(!buf.hasRemaining())
{
//empty data
- byte nflags = flags;
+ byte nflags = flags;
if (first)
{
nflags |= FIRST_FRAME;
first = false;
}
- nflags |= LAST_FRAME;
- Frame frame = new Frame(nflags, type,
+ nflags |= LAST_FRAME;
+ Frame frame = new Frame(nflags, type,
event.getProtocolEvent().getEncodedTrack(),
event.getChannel());
- // frame.addFragment(buf);
+ // frame.addFragment(buf);
sender.send(frame);
}
else
{
- while (buf.hasRemaining())
- {
- ByteBuffer slice = buf.slice();
- slice.limit(min(maxPayload, slice.remaining()));
- buf.position(buf.position() + slice.remaining());
-
- byte newflags = flags;
- if (first)
+ while (buf.hasRemaining())
{
- newflags |= FIRST_FRAME;
- first = false;
+ ByteBuffer slice = buf.slice();
+ slice.limit(min(maxPayload, slice.remaining()));
+ buf.position(buf.position() + slice.remaining());
+
+ byte newflags = flags;
+ if (first)
+ {
+ newflags |= FIRST_FRAME;
+ first = false;
+ }
+ if (last && !buf.hasRemaining())
+ {
+ newflags |= LAST_FRAME;
+ }
+
+ Frame frame = new Frame(newflags, type,
+ event.getProtocolEvent().getEncodedTrack(),
+ event.getChannel());
+ frame.addFragment(slice);
+ sender.send(frame);
}
- if (last && !buf.hasRemaining())
- {
- newflags |= LAST_FRAME;
- }
-
- Frame frame = new Frame(newflags, type,
- event.getProtocolEvent().getEncodedTrack(),
- event.getChannel());
- frame.addFragment(slice);
- sender.send(frame);
- }
}
}
@@ -128,15 +129,40 @@ public class Disassembler implements Sender<ConnectionEvent>,
sender.send(header);
}
- public void method(ConnectionEvent event, Method method)
+ public void control(ConnectionEvent event, Method method)
+ {
+ method(event, method, SegmentType.CONTROL);
+ }
+
+ public void command(ConnectionEvent event, Method method)
+ {
+ method(event, method, SegmentType.COMMAND);
+ }
+
+ private void method(ConnectionEvent event, Method method, SegmentType type)
{
SizeEncoder sizer = new SizeEncoder();
- sizer.writeShort(method.getEncodedType());
+ sizer.writeUint16(method.getEncodedType());
+ if (type == SegmentType.COMMAND)
+ {
+ sizer.writeUint16(0);
+ }
method.write(sizer);
ByteBuffer buf = ByteBuffer.allocate(sizer.size());
BBEncoder enc = new BBEncoder(buf);
- enc.writeShort(method.getEncodedType());
+ enc.writeUint16(method.getEncodedType());
+ if (type == SegmentType.COMMAND)
+ {
+ if (method.isSync())
+ {
+ enc.writeUint16(0x0101);
+ }
+ else
+ {
+ enc.writeUint16(0x0100);
+ }
+ }
method.write(enc);
enc.flush();
buf.flip();
@@ -148,7 +174,7 @@ public class Disassembler implements Sender<ConnectionEvent>,
flags |= LAST_SEG;
}
- fragment(flags, METHOD, event, buf, true, true);
+ fragment(flags, type, event, buf, true, true);
}
public void header(ConnectionEvent event, Header header)
@@ -159,24 +185,24 @@ public class Disassembler implements Sender<ConnectionEvent>,
SizeEncoder sizer = new SizeEncoder();
for (Struct st : header.getStructs())
{
- sizer.writeLongStruct(st);
+ sizer.writeStruct32(st);
}
buf = ByteBuffer.allocate(sizer.size());
BBEncoder enc = new BBEncoder(buf);
for (Struct st : header.getStructs())
{
- enc.writeLongStruct(st);
+ enc.writeStruct32(st);
enc.flush();
}
header.setBuf(buf);
}
else
{
- buf = header.getBuf();
+ buf = header.getBuf();
}
buf.flip();
- fragment((byte) 0x0, HEADER, event, buf, true, true);
+ fragment((byte) 0x0, SegmentType.HEADER, event, buf, true, true);
}
public void data(ConnectionEvent event, Data data)
@@ -187,7 +213,7 @@ public class Disassembler implements Sender<ConnectionEvent>,
{
ByteBuffer buf = it.next();
boolean last = data.isLast() && !it.hasNext();
- fragment(LAST_SEG, BODY, event, buf, first, last);
+ fragment(LAST_SEG, SegmentType.BODY, event, buf, first, last);
first = false;
}
}
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java b/java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java
index a5c5db4dba..c5fb7b9986 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpidity.transport.network;
+import org.apache.qpidity.transport.SegmentType;
import org.apache.qpidity.transport.util.SliceIterator;
import java.nio.ByteBuffer;
@@ -48,10 +49,6 @@ public class Frame implements NetworkEvent, Iterable<ByteBuffer>
public static final byte L3 = 2;
public static final byte L4 = 3;
- public static final byte METHOD = 1;
- public static final byte HEADER = 2;
- public static final byte BODY = 3;
-
public static final byte RESERVED = 0x0;
public static final byte VERSION = 0x0;
@@ -62,13 +59,13 @@ public class Frame implements NetworkEvent, Iterable<ByteBuffer>
public static final byte LAST_FRAME = 0x1;
final private byte flags;
- final private byte type;
+ final private SegmentType type;
final private byte track;
final private int channel;
final private List<ByteBuffer> fragments;
private int size;
- public Frame(byte flags, byte type, byte track, int channel)
+ public Frame(byte flags, SegmentType type, byte track, int channel)
{
this.flags = flags;
this.type = type;
@@ -99,7 +96,7 @@ public class Frame implements NetworkEvent, Iterable<ByteBuffer>
return size;
}
- public byte getType()
+ public SegmentType getType()
{
return type;
}
@@ -153,7 +150,7 @@ public class Frame implements NetworkEvent, Iterable<ByteBuffer>
{
StringBuilder str = new StringBuilder();
str.append(String.format
- ("[%05d %05d %1d %1d %d%d%d%d]", getChannel(), getSize(),
+ ("[%05d %05d %1d %1d %d%d%d%d] ", getChannel(), getSize(),
getTrack(), getType(),
isFirstSegment() ? 1 : 0, isLastSegment() ? 1 : 0,
isFirstFrame() ? 1 : 0, isLastFrame() ? 1 : 0));
@@ -170,7 +167,7 @@ public class Frame implements NetworkEvent, Iterable<ByteBuffer>
str.append(" | ");
}
- str.append(str(buf, 20));
+ str.append(str(buf));
}
return str.toString();
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java b/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java
index 871c45743e..2d41a9f516 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java
@@ -22,10 +22,10 @@ package org.apache.qpidity.transport.network;
import java.nio.ByteBuffer;
-import org.apache.qpidity.transport.Constant;
import org.apache.qpidity.transport.ProtocolError;
import org.apache.qpidity.transport.ProtocolHeader;
import org.apache.qpidity.transport.Receiver;
+import org.apache.qpidity.transport.SegmentType;
import static org.apache.qpidity.transport.util.Functions.*;
@@ -77,7 +77,7 @@ public class InputHandler implements Receiver<ByteBuffer>
private byte minor;
private byte flags;
- private byte type;
+ private SegmentType type;
private byte track;
private int channel;
private int size;
@@ -146,7 +146,7 @@ public class InputHandler implements Receiver<ByteBuffer>
flags = buf.get();
return FRAME_HDR_TYPE;
case FRAME_HDR_TYPE:
- type = buf.get();
+ type = SegmentType.get(buf.get());
return FRAME_HDR_SIZE1;
case FRAME_HDR_SIZE1:
size = (0xFF & buf.get()) << 8;
@@ -218,7 +218,7 @@ public class InputHandler implements Receiver<ByteBuffer>
return FRAME_END;
}
case FRAME_END:
- return expect(buf, Constant.FRAME_END, FRAME_HDR);
+ return expect(buf, OutputHandler.FRAME_END, FRAME_HDR);
default:
throw new IllegalStateException();
}
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java b/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java
index 9f770bcb1c..8f615cf80d 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java
@@ -67,11 +67,13 @@ public class OutputHandler implements Sender<NetworkEvent>, NetworkDelegate
}
}
+ public static final int FRAME_END = 0xCE;
+
public void frame(Frame frame)
{
ByteBuffer hdr = ByteBuffer.allocate(HEADER_SIZE + frame.getSize() + 1);
hdr.put(frame.getFlags());
- hdr.put(frame.getType());
+ hdr.put((byte) frame.getType().getValue());
hdr.putShort((short) (frame.getSize() + HEADER_SIZE));
hdr.put(RESERVED);
hdr.put(frame.getTrack());
@@ -84,7 +86,7 @@ public class OutputHandler implements Sender<NetworkEvent>, NetworkDelegate
{
hdr.put(buf);
}
- hdr.put((byte) Constant.FRAME_END);
+ hdr.put((byte) FRAME_END);
hdr.flip();
synchronized (lock)
{
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaHandler.java b/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaHandler.java
index 2e4875cf42..7da719087c 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaHandler.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaHandler.java
@@ -200,7 +200,7 @@ public class MinaHandler<E> implements IoHandler
ConnectionDelegate delegate)
{
return connect(host, port, new ConnectionBinding
- (delegate, InputHandler.State.FRAME_HDR));
+ (delegate, InputHandler.State.PROTO_HDR));
}
private static class ConnectionBinding
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/util/Functions.java b/java/common/src/main/java/org/apache/qpidity/transport/util/Functions.java
index 7dbb3d30e8..14e756c047 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/util/Functions.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/util/Functions.java
@@ -52,15 +52,24 @@ public class Functions
public static final String str(ByteBuffer buf, int limit)
{
StringBuilder str = new StringBuilder();
+ str.append('"');
+
for (int i = 0; i < min(buf.remaining(), limit); i++)
{
- if (i > 0 && i % 2 == 0)
+ byte c = buf.get(buf.position() + i);
+
+ if (c > 31 && c < 127 && c != '\\')
{
- str.append(" ");
+ str.append((char)c);
+ }
+ else
+ {
+ str.append(String.format("\\x%02x", c));
}
- str.append(String.format("%02x", buf.get(buf.position() + i)));
}
+ str.append('"');
+
return str.toString();
}
diff --git a/java/common/src/test/java/org/apache/qpidity/transport/ConnectionTest.java b/java/common/src/test/java/org/apache/qpidity/transport/ConnectionTest.java
index d7ece9b745..1a83786e12 100644
--- a/java/common/src/test/java/org/apache/qpidity/transport/ConnectionTest.java
+++ b/java/common/src/test/java/org/apache/qpidity/transport/ConnectionTest.java
@@ -100,12 +100,12 @@ public class ConnectionTest extends TestCase
}
Channel ch = conn.getChannel(0);
- Session ssn = new Session();
+ Session ssn = new Session("test".getBytes());
ssn.attach(ch);
try
{
- ssn.sessionOpen(1234);
+ ssn.sessionAttach(ssn.getName());
fail("writing to a closed socket succeeded");
}
catch (TransportException e)
diff --git a/java/common/templating.py b/java/common/templating.py
new file mode 100644
index 0000000000..832b7ecb9c
--- /dev/null
+++ b/java/common/templating.py
@@ -0,0 +1,101 @@
+
+class Parser:
+
+ def __init__(self, **kwargs):
+ self.output = ""
+ self.environ = {"out": self.parse}
+ for k, v in kwargs.items():
+ self.environ[k] = v
+ self.text = ""
+ self.level = 0
+ self.line = None
+
+ def action(self, actor):
+ text = self.text
+ self.text = ""
+ actor(text)
+
+ def out(self, text):
+ self.output += text
+
+ def prefix_lines(self, text):
+ return "%s%s" % ("\n"*(self.line - 1 - text.count("\n")), text)
+
+ def evaluate(self, text):
+ self.out(str(eval(self.prefix_lines(text), self.environ, self.environ)))
+
+ def execute(self, text):
+ exec self.prefix_lines(text) in self.environ, self.environ
+
+ def parse(self, input):
+ old_line = self.line
+ try:
+ state = self.start
+ self.line = 1
+ for ch in input:
+ state = state(ch)
+ if ch == "\n":
+ self.line += 1
+ if state == self.start:
+ self.action(self.out)
+ elif state == self.alnum:
+ self.action(self.evaluate)
+ else:
+ raise ParseError()
+ finally:
+ self.line = old_line
+
+ def start(self, ch):
+ if ch == "$":
+ return self.dollar
+ else:
+ self.text += ch
+ return self.start
+
+ def dollar(self, ch):
+ if ch == "$":
+ self.text += "$"
+ return self.start
+ elif ch == "(":
+ self.action(self.out)
+ return self.expression
+ elif ch == "{":
+ self.action(self.out)
+ return self.block
+ else:
+ self.action(self.out)
+ self.text += ch
+ return self.alnum
+
+ def alnum(self, ch):
+ if ch.isalnum():
+ self.text += ch
+ return self.alnum
+ else:
+ self.action(self.evaluate)
+ self.text += ch
+ return self.start
+
+ def match(self, ch, start, end):
+ if ch == start:
+ self.level += 1
+ if ch == end:
+ self.level -= 1
+
+ def block(self, ch):
+ if not self.level and ch == "}":
+ self.action(self.execute)
+ return self.start
+ else:
+ self.match(ch, "{", "}")
+ self.text += ch
+ return self.block
+
+ def expression(self, ch):
+ if not self.level and ch == ")":
+ self.action(self.evaluate)
+ return self.start
+ else:
+ self.match(ch, "(", ")")
+ self.text += ch
+ return self.expression