summaryrefslogtreecommitdiff
path: root/java/common
diff options
context:
space:
mode:
Diffstat (limited to 'java/common')
-rw-r--r--java/common/Composite.tpl10
-rw-r--r--java/common/Invoker.tpl6
-rw-r--r--java/common/Option.tpl3
-rw-r--r--java/common/genutil.py8
-rw-r--r--java/common/src/main/java/org/apache/qpidity/ConsoleOutput.java5
-rw-r--r--java/common/src/main/java/org/apache/qpidity/ToyBroker.java12
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/Binary.java129
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/Channel.java12
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/Connection.java7
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/Data.java35
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/Echo.java5
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/Method.java33
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/Sender.java2
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/Session.java15
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java68
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java159
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/codec/BBDecoder.java21
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/codec/BBEncoder.java168
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/codec/FragmentDecoder.java112
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/codec/SizeEncoder.java134
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/codec/Sizer.java76
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java85
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/network/Disassembler.java68
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java44
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java22
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java57
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/network/io/IoSender.java5
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaSender.java5
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/network/nio/NioSender.java5
29 files changed, 693 insertions, 618 deletions
diff --git a/java/common/Composite.tpl b/java/common/Composite.tpl
index 46a45b0b91..5df1ef44fb 100644
--- a/java/common/Composite.tpl
+++ b/java/common/Composite.tpl
@@ -80,7 +80,7 @@ if pack > 0:
out(" private $(PACK_TYPES[pack]) packing_flags = 0;\n");
fields = get_fields(type)
-params = get_parameters(fields)
+params = get_parameters(type, fields)
options = get_options(fields)
for f in fields:
@@ -99,7 +99,7 @@ for f in fields:
if f.option: continue
out(" $(f.set)($(f.name));\n")
-if options:
+if options or base == "Method":
out("""
for (int i=0; i < _options.length; i++) {
switch (_options[i]) {
@@ -108,7 +108,11 @@ if options:
for f in options:
out(" case $(f.option): packing_flags |= $(f.flag_mask(pack)); break;\n")
- out(""" case NO_OPTION: break;
+ if base == "Method":
+ out(""" case SYNC: this.setSync(true); break;
+ case BATCH: this.setBatch(true); break;
+""")
+ out(""" case NONE: break;
default: throw new IllegalArgumentException("invalid option: " + _options[i]);
}
}
diff --git a/java/common/Invoker.tpl b/java/common/Invoker.tpl
index d9905c71a0..4e174619f0 100644
--- a/java/common/Invoker.tpl
+++ b/java/common/Invoker.tpl
@@ -15,8 +15,8 @@ from genutil import *
for c in composites:
name = cname(c)
fields = get_fields(c)
- params = get_parameters(fields)
- args = get_arguments(fields)
+ params = get_parameters(c, fields)
+ args = get_arguments(c, fields)
result = c["result"]
if result:
if not result["@type"]:
@@ -32,7 +32,7 @@ for c in composites:
jclass = ""
out("""
- public $jresult $(dromedary(name))($(", ".join(params))) {
+ public final $jresult $(dromedary(name))($(", ".join(params))) {
$(jreturn)invoke(new $name($(", ".join(args)))$jclass);
}
""")
diff --git a/java/common/Option.tpl b/java/common/Option.tpl
index 5fa2b95b9f..3228949d87 100644
--- a/java/common/Option.tpl
+++ b/java/common/Option.tpl
@@ -15,5 +15,6 @@ for c in composites:
if not options.has_key(option):
options[option] = None
out(" $option,\n")}
- NO_OPTION
+ BATCH,
+ NONE
}
diff --git a/java/common/genutil.py b/java/common/genutil.py
index 9636a91cc3..2f1caa41c4 100644
--- a/java/common/genutil.py
+++ b/java/common/genutil.py
@@ -206,7 +206,7 @@ def get_fields(nd):
index += 1
return fields
-def get_parameters(fields):
+def get_parameters(type, fields):
params = []
options = False
for f in fields:
@@ -214,11 +214,11 @@ def get_parameters(fields):
options = True
else:
params.append("%s %s" % (f.type, f.name))
- if options:
+ if options or type.name in ("control", "command"):
params.append("Option ... _options")
return params
-def get_arguments(fields):
+def get_arguments(type, fields):
args = []
options = False
for f in fields:
@@ -226,7 +226,7 @@ def get_arguments(fields):
options = True
else:
args.append(f.name)
- if options:
+ if options or type.name in ("control", "command"):
args.append("_options")
return args
diff --git a/java/common/src/main/java/org/apache/qpidity/ConsoleOutput.java b/java/common/src/main/java/org/apache/qpidity/ConsoleOutput.java
index 4e05aa574c..6262bd25c6 100644
--- a/java/common/src/main/java/org/apache/qpidity/ConsoleOutput.java
+++ b/java/common/src/main/java/org/apache/qpidity/ConsoleOutput.java
@@ -41,6 +41,11 @@ public class ConsoleOutput implements Sender<ByteBuffer>
System.out.println(str(buf));
}
+ public void flush()
+ {
+ // pass
+ }
+
public void close()
{
System.out.println("CLOSED");
diff --git a/java/common/src/main/java/org/apache/qpidity/ToyBroker.java b/java/common/src/main/java/org/apache/qpidity/ToyBroker.java
index 5f9917e30a..0055855c0a 100644
--- a/java/common/src/main/java/org/apache/qpidity/ToyBroker.java
+++ b/java/common/src/main/java/org/apache/qpidity/ToyBroker.java
@@ -188,10 +188,7 @@ class ToyBroker extends SessionDelegate
ssn.header(m.header);
for (Data d : m.body)
{
- for (ByteBuffer b : d.getFragments())
- {
- ssn.data(b);
- }
+ ssn.data(d.getData());
}
ssn.endData();
}
@@ -245,11 +242,8 @@ class ToyBroker extends SessionDelegate
for (Data d : body)
{
- for (ByteBuffer b : d.getFragments())
- {
- sb.append(" | ");
- sb.append(str(b));
- }
+ sb.append(" | ");
+ sb.append(d);
}
return sb.toString();
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Binary.java b/java/common/src/main/java/org/apache/qpidity/transport/Binary.java
new file mode 100644
index 0000000000..1a1112d424
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpidity/transport/Binary.java
@@ -0,0 +1,129 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpidity.transport;
+
+
+/**
+ * Binary
+ *
+ */
+
+public final class Binary
+{
+
+ private byte[] bytes;
+ private int offset;
+ private int size;
+ private int hash = 0;
+
+ public Binary(byte[] bytes, int offset, int size)
+ {
+ if (offset + size > bytes.length)
+ {
+ throw new ArrayIndexOutOfBoundsException();
+ }
+
+ this.bytes = bytes;
+ this.offset = offset;
+ this.size = size;
+ }
+
+ public Binary(byte[] bytes)
+ {
+ this(bytes, 0, bytes.length);
+ }
+
+ public final byte[] array()
+ {
+ return bytes;
+ }
+
+ public final int offset()
+ {
+ return offset;
+ }
+
+ public final int size()
+ {
+ return size;
+ }
+
+ public final Binary slice(int low, int high)
+ {
+ int sz;
+
+ if (high < 0)
+ {
+ sz = size + high;
+ }
+ else
+ {
+ sz = high - low;
+ }
+
+ if (sz < 0)
+ {
+ sz = 0;
+ }
+
+ return new Binary(bytes, offset + low, sz);
+ }
+
+ public final int hashCode()
+ {
+ if (hash == 0)
+ {
+ int hc = 0;
+ for (int i = 0; i < size; i++)
+ {
+ hc = 31*hc + (0xFF & bytes[offset + i]);
+ }
+ hash = hc;
+ }
+
+ return hash;
+ }
+
+ public final boolean equals(Object o)
+ {
+ if (!(o instanceof Binary))
+ {
+ return false;
+ }
+
+ Binary buf = (Binary) o;
+ if (this.size != buf.size)
+ {
+ return false;
+ }
+
+ for (int i = 0; i < size; i++)
+ {
+ if (bytes[offset + i] != buf.bytes[buf.offset + i])
+ {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+}
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Channel.java b/java/common/src/main/java/org/apache/qpidity/transport/Channel.java
index fb8918eb7b..eb37ce1590 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/Channel.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/Channel.java
@@ -56,6 +56,7 @@ public class Channel extends Invoker
private Lock commandLock = new ReentrantLock();
private boolean first = true;
private ByteBuffer data = null;
+ private boolean batch = false;
public Channel(Connection connection, int channel, SessionDelegate delegate)
{
@@ -162,6 +163,13 @@ public class Channel extends Invoker
emit(m);
+ if (!m.isBatch() && !m.hasPayload())
+ {
+ connection.flush();
+ }
+
+ batch = m.isBatch();
+
if (m.getEncodedTrack() == Frame.L4 && !m.hasPayload())
{
commandLock.unlock();
@@ -199,6 +207,10 @@ public class Channel extends Invoker
emit(new Data(data, first, true));
first = true;
data = null;
+ if (!batch)
+ {
+ connection.flush();
+ }
commandLock.unlock();
}
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Connection.java b/java/common/src/main/java/org/apache/qpidity/transport/Connection.java
index 9829343491..15116be1c3 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/Connection.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/Connection.java
@@ -40,7 +40,6 @@ import java.nio.ByteBuffer;
* short instead of Short
*/
-// RA making this public until we sort out the package issues
public class Connection
implements Receiver<ConnectionEvent>, Sender<ConnectionEvent>
{
@@ -90,6 +89,12 @@ public class Connection
sender.send(event);
}
+ public void flush()
+ {
+ log.debug("FLUSH: [%s]", this);
+ sender.flush();
+ }
+
public int getChannelMax()
{
return channelMax;
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Data.java b/java/common/src/main/java/org/apache/qpidity/transport/Data.java
index 4f61380809..8792518834 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/Data.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/Data.java
@@ -37,25 +37,20 @@ import static org.apache.qpidity.transport.util.Functions.*;
public class Data implements ProtocolEvent
{
- private final Iterable<ByteBuffer> fragments;
+ private final ByteBuffer data;
private final boolean first;
private final boolean last;
- public Data(Iterable<ByteBuffer> fragments, boolean first, boolean last)
+ public Data(ByteBuffer data, boolean first, boolean last)
{
- this.fragments = fragments;
+ this.data = data;
this.first = first;
this.last = last;
}
- public Data(ByteBuffer buf, boolean first, boolean last)
+ public ByteBuffer getData()
{
- this(Collections.singletonList(buf), first, last);
- }
-
- public Iterable<ByteBuffer> getFragments()
- {
- return fragments;
+ return data.slice();
}
public boolean isFirst()
@@ -82,25 +77,7 @@ public class Data implements ProtocolEvent
{
StringBuffer str = new StringBuffer();
str.append("Data(");
- boolean first = true;
- int left = 64;
- for (ByteBuffer buf : getFragments())
- {
- if (first)
- {
- first = false;
- }
- else
- {
- str.append(" | ");
- }
- str.append(str(buf, left));
- left -= buf.remaining();
- if (left < 0)
- {
- break;
- }
- }
+ str.append(str(data, 64));
str.append(")");
return str.toString();
}
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Echo.java b/java/common/src/main/java/org/apache/qpidity/transport/Echo.java
index 03d0d3e161..ed323c7eac 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/Echo.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/Echo.java
@@ -49,10 +49,7 @@ public class Echo extends SessionDelegate
public void data(Session ssn, Data data)
{
- for (ByteBuffer buf : data.getFragments())
- {
- ssn.data(buf);
- }
+ ssn.data(data.getData());
if (data.isLast())
{
ssn.endData();
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Method.java b/java/common/src/main/java/org/apache/qpidity/transport/Method.java
index f72ebd570c..a0605e6e66 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/Method.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/Method.java
@@ -42,6 +42,7 @@ public abstract class Method extends Struct implements ProtocolEvent
private int id;
private boolean idSet = false;
private boolean sync = false;
+ private boolean batch = false;
public final int getId()
{
@@ -59,11 +60,21 @@ public abstract class Method extends Struct implements ProtocolEvent
return sync;
}
- void setSync(boolean value)
+ final void setSync(boolean value)
{
this.sync = value;
}
+ public final boolean isBatch()
+ {
+ return batch;
+ }
+
+ final void setBatch(boolean value)
+ {
+ this.batch = value;
+ }
+
public abstract boolean hasPayload();
public abstract byte getEncodedTrack();
@@ -84,26 +95,30 @@ public abstract class Method extends Struct implements ProtocolEvent
public String toString()
{
- if (getEncodedTrack() != Frame.L4)
- {
- return super.toString();
- }
-
StringBuilder str = new StringBuilder();
- if (idSet)
+ if (getEncodedTrack() == Frame.L4 && idSet)
{
str.append("id=");
str.append(id);
}
- if (sync)
+ if (sync || batch)
{
if (str.length() > 0)
{
str.append(" ");
}
- str.append(" [sync]");
+ str.append("[");
+ if (sync)
+ {
+ str.append("S");
+ }
+ if (batch)
+ {
+ str.append("B");
+ }
+ str.append("]");
}
if (str.length() > 0)
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Sender.java b/java/common/src/main/java/org/apache/qpidity/transport/Sender.java
index 6da8358bd6..ba3e67e578 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/Sender.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/Sender.java
@@ -31,6 +31,8 @@ public interface Sender<T>
void send(T msg);
+ void flush();
+
void close();
}
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Session.java b/java/common/src/main/java/org/apache/qpidity/transport/Session.java
index 988ac4788f..ca572119d9 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/Session.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/Session.java
@@ -130,7 +130,7 @@ public class Session extends Invoker
log.debug("ID: [%s] %s", this.channel, id);
if ((id % 65536) == 0)
{
- flushProcessed(true);
+ flushProcessed(TIMELY_REPLY);
}
}
@@ -166,19 +166,14 @@ public class Session extends Invoker
}
}
- public void flushProcessed()
- {
- flushProcessed(false);
- }
-
- private void flushProcessed(boolean timely_reply)
+ public void flushProcessed(Option ... options)
{
RangeSet copy;
synchronized (processedLock)
{
copy = processed.copy();
}
- sessionCompleted(copy, timely_reply ? TIMELY_REPLY : NO_OPTION);
+ sessionCompleted(copy, options);
}
void knownComplete(RangeSet kc)
@@ -353,9 +348,7 @@ public class Session extends Invoker
if (needSync && lt(maxComplete, point))
{
- ExecutionSync sync = new ExecutionSync();
- sync.setSync(true);
- invoke(sync);
+ executionSync(SYNC);
}
long start = System.currentTimeMillis();
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java b/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java
index ebfc6b120f..77899ad712 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java
@@ -29,6 +29,7 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
+import org.apache.qpidity.transport.Binary;
import org.apache.qpidity.transport.RangeSet;
import org.apache.qpidity.transport.Struct;
import org.apache.qpidity.transport.Type;
@@ -45,6 +46,14 @@ import static org.apache.qpidity.transport.util.Functions.*;
abstract class AbstractDecoder implements Decoder
{
+ private final Map<Binary,String> str8cache = new LinkedHashMap<Binary,String>()
+ {
+ @Override protected boolean removeEldestEntry(Map.Entry<Binary,String> me)
+ {
+ return size() > 4*1024;
+ }
+ };
+
protected abstract byte doGet();
protected abstract void doGet(byte[] bytes);
@@ -59,6 +68,13 @@ abstract class AbstractDecoder implements Decoder
doGet(bytes);
}
+ protected Binary get(int size)
+ {
+ byte[] bytes = new byte[size];
+ get(bytes);
+ return new Binary(bytes);
+ }
+
protected short uget()
{
return (short) (0xFF & get());
@@ -105,11 +121,11 @@ abstract class AbstractDecoder implements Decoder
return readUint64();
}
- private static final String decode(byte[] bytes, String charset)
+ private static final String decode(byte[] bytes, int offset, int length, String charset)
{
try
{
- return new String(bytes, charset);
+ return new String(bytes, offset, length, charset);
}
catch (UnsupportedEncodingException e)
{
@@ -117,13 +133,22 @@ abstract class AbstractDecoder implements Decoder
}
}
+ private static final String decode(byte[] bytes, String charset)
+ {
+ return decode(bytes, 0, bytes.length, charset);
+ }
public String readStr8()
{
short size = readUint8();
- byte[] bytes = new byte[size];
- get(bytes);
- return decode(bytes, "UTF-8");
+ Binary bin = get(size);
+ String str = str8cache.get(bin);
+ if (str == null)
+ {
+ str = decode(bin.array(), bin.offset(), bin.size(), "UTF-8");
+ str8cache.put(bin, str);
+ }
+ return str;
}
public String readStr16()
@@ -233,7 +258,19 @@ abstract class AbstractDecoder implements Decoder
public Map<String,Object> readMap()
{
long size = readUint32();
+
+ if (size == 0)
+ {
+ return null;
+ }
+
long count = readUint32();
+
+ if (count == 0)
+ {
+ return Collections.EMPTY_MAP;
+ }
+
Map<String,Object> result = new LinkedHashMap();
for (int i = 0; i < count; i++)
{
@@ -243,13 +280,26 @@ abstract class AbstractDecoder implements Decoder
Object value = read(t);
result.put(key, value);
}
+
return result;
}
public List<Object> readList()
{
long size = readUint32();
+
+ if (size == 0)
+ {
+ return null;
+ }
+
long count = readUint32();
+
+ if (count == 0)
+ {
+ return Collections.EMPTY_LIST;
+ }
+
List<Object> result = new ArrayList();
for (int i = 0; i < count; i++)
{
@@ -264,15 +314,21 @@ abstract class AbstractDecoder implements Decoder
public List<Object> readArray()
{
long size = readUint32();
+
if (size == 0)
{
- return Collections.EMPTY_LIST;
+ return null;
}
byte code = get();
Type t = getType(code);
long count = readUint32();
+ if (count == 0)
+ {
+ return Collections.EMPTY_LIST;
+ }
+
List<Object> result = new ArrayList<Object>();
for (int i = 0; i < count; i++)
{
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java b/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java
index aa90627943..8908b94ed3 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java
@@ -26,6 +26,7 @@ import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -64,10 +65,13 @@ abstract class AbstractEncoder implements Encoder
ENCODINGS.put(byte[].class, Type.VBIN32);
}
- protected Sizer sizer()
+ private final Map<String,byte[]> str8cache = new LinkedHashMap<String,byte[]>()
{
- return new SizeEncoder();
- }
+ @Override protected boolean removeEldestEntry(Map.Entry<String,byte[]> me)
+ {
+ return size() > 4*1024;
+ }
+ };
protected abstract void doPut(byte b);
@@ -88,6 +92,15 @@ abstract class AbstractEncoder implements Encoder
put(ByteBuffer.wrap(bytes));
}
+ protected abstract int beginSize8();
+ protected abstract void endSize8(int pos);
+
+ protected abstract int beginSize16();
+ protected abstract void endSize16(int pos);
+
+ protected abstract int beginSize32();
+ protected abstract void endSize32(int pos);
+
public void writeUint8(short b)
{
assert b < 0x100;
@@ -132,23 +145,6 @@ abstract class AbstractEncoder implements Encoder
writeUint64(l);
}
- private static final String checkLength(String s, int n)
- {
- if (s == null)
- {
- return "";
- }
-
- if (s.length() > n)
- {
- throw new IllegalArgumentException("string too long: " + s);
- }
- else
- {
- return s;
- }
- }
-
private static final byte[] encode(String s, String charset)
{
try
@@ -163,16 +159,31 @@ abstract class AbstractEncoder implements Encoder
public void writeStr8(String s)
{
- s = checkLength(s, 255);
- writeUint8((short) s.length());
- put(ByteBuffer.wrap(encode(s, "UTF-8")));
+ if (s == null)
+ {
+ s = "";
+ }
+
+ byte[] bytes = str8cache.get(s);
+ if (bytes == null)
+ {
+ bytes = encode(s, "UTF-8");
+ str8cache.put(s, bytes);
+ }
+ writeUint8((short) bytes.length);
+ put(bytes);
}
public void writeStr16(String s)
{
- s = checkLength(s, 65535);
- writeUint16(s.length());
- put(ByteBuffer.wrap(encode(s, "UTF-8")));
+ if (s == null)
+ {
+ s = "";
+ }
+
+ byte[] bytes = encode(s, "UTF-8");
+ writeUint16(bytes.length);
+ put(bytes);
}
public void writeVbin8(byte[] bytes)
@@ -245,18 +256,10 @@ abstract class AbstractEncoder implements Encoder
}
int width = s.getSizeWidth();
+ int pos = -1;
if (width > 0)
{
- if (empty)
- {
- writeSize(width, 0);
- }
- else
- {
- Sizer sizer = sizer();
- s.write(sizer);
- writeSize(width, sizer.size());
- }
+ pos = beginSize(width);
}
if (type > 0)
@@ -265,6 +268,11 @@ abstract class AbstractEncoder implements Encoder
}
s.write(this);
+
+ if (width > 0)
+ {
+ endSize(width, pos);
+ }
}
public void writeStruct32(Struct s)
@@ -275,12 +283,10 @@ abstract class AbstractEncoder implements Encoder
}
else
{
- Sizer sizer = sizer();
- sizer.writeUint16(s.getEncodedType());
- s.write(sizer);
- writeUint32(sizer.size());
+ int pos = beginSize32();
writeUint16(s.getEncodedType());
s.write(this);
+ endSize32(pos);
}
}
@@ -338,18 +344,13 @@ abstract class AbstractEncoder implements Encoder
public void writeMap(Map<String,Object> map)
{
- if (map == null)
+ int pos = beginSize32();
+ if (map != null)
{
- writeUint32(0);
- return;
+ writeUint32(map.size());
+ writeMapEntries(map);
}
-
- Sizer sizer = sizer();
- sizer.writeMap(map);
- // XXX: - 4
- writeUint32(sizer.size() - 4);
- writeUint32(map.size());
- writeMapEntries(map);
+ endSize32(pos);
}
protected void writeMapEntries(Map<String,Object> map)
@@ -367,12 +368,13 @@ abstract class AbstractEncoder implements Encoder
public void writeList(List<Object> list)
{
- Sizer sizer = sizer();
- sizer.writeList(list);
- // XXX: - 4
- writeUint32(sizer.size() - 4);
- writeUint32(list.size());
- writeListEntries(list);
+ int pos = beginSize32();
+ if (list != null)
+ {
+ writeUint32(list.size());
+ writeListEntries(list);
+ }
+ endSize32(pos);
}
protected void writeListEntries(List<Object> list)
@@ -387,16 +389,12 @@ abstract class AbstractEncoder implements Encoder
public void writeArray(List<Object> array)
{
- if (array == null)
+ int pos = beginSize32();
+ if (array != null)
{
- array = Collections.EMPTY_LIST;
+ writeArrayEntries(array);
}
-
- Sizer sizer = sizer();
- sizer.writeArray(array);
- // XXX: -4
- writeUint32(sizer.size() - 4);
- writeArrayEntries(array);
+ endSize32(pos);
}
protected void writeArrayEntries(List<Object> array)
@@ -458,6 +456,39 @@ abstract class AbstractEncoder implements Encoder
}
}
+ private int beginSize(int width)
+ {
+ switch (width)
+ {
+ case 1:
+ return beginSize8();
+ case 2:
+ return beginSize16();
+ case 4:
+ return beginSize32();
+ default:
+ throw new IllegalStateException("illegal width: " + width);
+ }
+ }
+
+ private void endSize(int width, int pos)
+ {
+ switch (width)
+ {
+ case 1:
+ endSize8(pos);
+ break;
+ case 2:
+ endSize16(pos);
+ break;
+ case 4:
+ endSize32(pos);
+ break;
+ default:
+ throw new IllegalStateException("illegal width: " + width);
+ }
+ }
+
private void writeBytes(Type t, byte[] bytes)
{
writeSize(t, bytes.length);
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/codec/BBDecoder.java b/java/common/src/main/java/org/apache/qpidity/transport/codec/BBDecoder.java
index cf40cef8bf..f036fd8dee 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/codec/BBDecoder.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/codec/BBDecoder.java
@@ -23,6 +23,8 @@ package org.apache.qpidity.transport.codec;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
+import org.apache.qpidity.transport.Binary;
+
/**
* BBDecoder
@@ -33,9 +35,9 @@ import java.nio.ByteOrder;
public final class BBDecoder extends AbstractDecoder
{
- private final ByteBuffer in;
+ private ByteBuffer in;
- public BBDecoder(ByteBuffer in)
+ public void init(ByteBuffer in)
{
this.in = in;
this.in.order(ByteOrder.BIG_ENDIAN);
@@ -51,6 +53,21 @@ public final class BBDecoder extends AbstractDecoder
in.get(bytes);
}
+ protected Binary get(int size)
+ {
+ if (in.hasArray())
+ {
+ byte[] bytes = in.array();
+ Binary bin = new Binary(bytes, in.arrayOffset() + in.position(), size);
+ in.position(in.position() + size);
+ return bin;
+ }
+ else
+ {
+ return super.get(size);
+ }
+ }
+
public boolean hasRemaining()
{
return in.hasRemaining();
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/codec/BBEncoder.java b/java/common/src/main/java/org/apache/qpidity/transport/codec/BBEncoder.java
index 2e7b41bf42..a976d38efc 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/codec/BBEncoder.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/codec/BBEncoder.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpidity.transport.codec;
+import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
@@ -33,47 +34,194 @@ import java.nio.ByteOrder;
public final class BBEncoder extends AbstractEncoder
{
- private final ByteBuffer out;
+ private ByteBuffer out;
- public BBEncoder(ByteBuffer out) {
- this.out = out;
- this.out.order(ByteOrder.BIG_ENDIAN);
+ public BBEncoder(int capacity) {
+ out = ByteBuffer.allocate(capacity);
+ out.order(ByteOrder.BIG_ENDIAN);
+ }
+
+ public void init()
+ {
+ out.clear();
+ }
+
+ public ByteBuffer done()
+ {
+ out.flip();
+ ByteBuffer encoded = ByteBuffer.allocate(out.remaining());
+ encoded.put(out);
+ encoded.flip();
+ return encoded;
+ }
+
+ private void grow(int size)
+ {
+ ByteBuffer old = out;
+ int capacity = old.capacity();
+ out = ByteBuffer.allocate(Math.max(capacity + size, 2*capacity));
+ out.order(ByteOrder.BIG_ENDIAN);
+ out.put(old);
}
protected void doPut(byte b)
{
- out.put(b);
+ try
+ {
+ out.put(b);
+ }
+ catch (BufferOverflowException e)
+ {
+ grow(1);
+ out.put(b);
+ }
}
protected void doPut(ByteBuffer src)
{
- out.put(src);
+ try
+ {
+ out.put(src);
+ }
+ catch (BufferOverflowException e)
+ {
+ grow(src.remaining());
+ out.put(src);
+ }
+ }
+
+ protected void put(byte[] bytes)
+ {
+ try
+ {
+ out.put(bytes);
+ }
+ catch (BufferOverflowException e)
+ {
+ grow(bytes.length);
+ out.put(bytes);
+ }
}
public void writeUint8(short b)
{
assert b < 0x100;
- out.put((byte) b);
+ try
+ {
+ out.put((byte) b);
+ }
+ catch (BufferOverflowException e)
+ {
+ grow(1);
+ out.put((byte) b);
+ }
}
public void writeUint16(int s)
{
assert s < 0x10000;
- out.putShort((short) s);
+ try
+ {
+ out.putShort((short) s);
+ }
+ catch (BufferOverflowException e)
+ {
+ grow(2);
+ out.putShort((short) s);
+ }
}
public void writeUint32(long i)
{
assert i < 0x100000000L;
- out.putInt((int) i);
+ try
+ {
+ out.putInt((int) i);
+ }
+ catch (BufferOverflowException e)
+ {
+ grow(4);
+ out.putInt((int) i);
+ }
}
public void writeUint64(long l)
{
- out.putLong(l);
+ try
+ {
+ out.putLong(l);
+ }
+ catch (BufferOverflowException e)
+ {
+ grow(8);
+ out.putLong(l);
+ }
+ }
+
+ public int beginSize8()
+ {
+ int pos = out.position();
+ try
+ {
+ out.put((byte) 0);
+ }
+ catch (BufferOverflowException e)
+ {
+ grow(1);
+ out.put((byte) 0);
+ }
+ return pos;
+ }
+
+ public void endSize8(int pos)
+ {
+ int cur = out.position();
+ out.put(pos, (byte) (cur - pos - 1));
+ }
+
+ public int beginSize16()
+ {
+ int pos = out.position();
+ try
+ {
+ out.putShort((short) 0);
+ }
+ catch (BufferOverflowException e)
+ {
+ grow(2);
+ out.putShort((short) 0);
+ }
+ return pos;
+ }
+
+ public void endSize16(int pos)
+ {
+ int cur = out.position();
+ out.putShort(pos, (short) (cur - pos - 2));
+ }
+
+ public int beginSize32()
+ {
+ int pos = out.position();
+ try
+ {
+ out.putInt(0);
+ }
+ catch (BufferOverflowException e)
+ {
+ grow(4);
+ out.putInt(0);
+ }
+ return pos;
+ }
+
+ public void endSize32(int pos)
+ {
+ int cur = out.position();
+ out.putInt(pos, (cur - pos - 4));
}
}
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/codec/FragmentDecoder.java b/java/common/src/main/java/org/apache/qpidity/transport/codec/FragmentDecoder.java
deleted file mode 100644
index 474211ced2..0000000000
--- a/java/common/src/main/java/org/apache/qpidity/transport/codec/FragmentDecoder.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpidity.transport.codec;
-
-import java.nio.BufferUnderflowException;
-import java.nio.ByteBuffer;
-
-import java.util.Iterator;
-
-import static java.lang.Math.*;
-
-
-/**
- * FragmentDecoder
- *
- * @author Rafael H. Schloming
- */
-
-public class FragmentDecoder extends AbstractDecoder
-{
-
- private final Iterator<ByteBuffer> fragments;
- private ByteBuffer current;
-
- public FragmentDecoder(Iterator<ByteBuffer> fragments)
- {
- this.fragments = fragments;
- this.current = null;
- }
-
- public boolean hasRemaining()
- {
- advance();
- return current != null || fragments.hasNext();
- }
-
- private void advance()
- {
- while (current == null && fragments.hasNext())
- {
- current = fragments.next();
- if (current.hasRemaining())
- {
- break;
- }
- else
- {
- current = null;
- }
- }
- }
-
- private void preRead()
- {
- advance();
-
- if (current == null)
- {
- throw new BufferUnderflowException();
- }
- }
-
- private void postRead()
- {
- if (current.remaining() == 0)
- {
- current = null;
- }
- }
-
- protected byte doGet()
- {
- preRead();
- byte b = current.get();
- postRead();
- return b;
- }
-
- protected void doGet(byte[] bytes)
- {
- int remaining = bytes.length;
- int offset = 0;
- while (remaining > 0)
- {
- preRead();
- int size = min(remaining, current.remaining());
- current.get(bytes, offset, size);
- offset += size;
- remaining -= size;
- postRead();
- }
- }
-
-}
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/codec/SizeEncoder.java b/java/common/src/main/java/org/apache/qpidity/transport/codec/SizeEncoder.java
deleted file mode 100644
index 2e7e883a0b..0000000000
--- a/java/common/src/main/java/org/apache/qpidity/transport/codec/SizeEncoder.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpidity.transport.codec;
-
-import java.nio.ByteBuffer;
-
-import java.util.Map;
-import java.util.UUID;
-
-import org.apache.qpidity.transport.RangeSet;
-
-
-/**
- * SizeEncoder
- *
- * @author Rafael H. Schloming
- */
-
-public class SizeEncoder extends AbstractEncoder implements Sizer
-{
-
- private int size;
-
- public SizeEncoder() {
- this(0);
- }
-
- public SizeEncoder(int size) {
- this.size = size;
- }
-
- protected Sizer sizer()
- {
- return Sizer.NULL;
- }
-
- public int getSize() {
- return size;
- }
-
- public void setSize(int size) {
- this.size = size;
- }
-
- public int size()
- {
- return getSize();
- }
-
- protected void doPut(byte b)
- {
- size += 1;
- }
-
- protected void doPut(ByteBuffer src)
- {
- size += src.remaining();
- }
-
- public void writeUint8(short b)
- {
- size += 1;
- }
-
- public void writeUint16(int s)
- {
- size += 2;
- }
-
- public void writeUint32(long i)
- {
- size += 4;
- }
-
- public void writeUint64(long l)
- {
- size += 8;
- }
-
- public void writeDatetime(long l)
- {
- size += 8;
- }
-
- public void writeUuid(UUID uuid)
- {
- size += 16;
- }
-
- public void writeSequenceNo(int s)
- {
- size += 4;
- }
-
- public void writeSequenceSet(RangeSet ranges)
- {
- size += 2 + 8*ranges.size();
- }
-
- //void writeByteRanges(RangeSet ranges); // XXX
-
- //void writeStr8(String s);
- //void writeStr16(String s);
-
- //void writeVbin8(byte[] bytes);
- //void writeVbin16(byte[] bytes);
- //void writeVbin32(byte[] bytes);
-
- //void writeStruct32(Struct s);
- //void writeMap(Map<String,Object> map);
- //void writeList(List<Object> list);
- //void writeArray(List<Object> array);
-
- //void writeStruct(int type, Struct s);
-
-}
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/codec/Sizer.java b/java/common/src/main/java/org/apache/qpidity/transport/codec/Sizer.java
deleted file mode 100644
index d386987d64..0000000000
--- a/java/common/src/main/java/org/apache/qpidity/transport/codec/Sizer.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpidity.transport.codec;
-
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-import org.apache.qpidity.transport.RangeSet;
-import org.apache.qpidity.transport.Struct;
-
-
-/**
- * Sizer
- *
- */
-
-public interface Sizer extends Encoder
-{
-
- public static final Sizer NULL = new Sizer()
- {
- public void writeUint8(short b) {}
- public void writeUint16(int s) {}
- public void writeUint32(long i) {}
- public void writeUint64(long l) {}
-
- public void writeDatetime(long l) {}
- public void writeUuid(UUID uuid) {}
-
- public void writeSequenceNo(int s) {}
- public void writeSequenceSet(RangeSet ranges) {} // XXX
- public void writeByteRanges(RangeSet ranges) {} // XXX
-
- public void writeStr8(String s) {}
- public void writeStr16(String s) {}
-
- public void writeVbin8(byte[] bytes) {}
- public void writeVbin16(byte[] bytes) {}
- public void writeVbin32(byte[] bytes) {}
-
- public void writeStruct32(Struct s) {}
- public void writeMap(Map<String,Object> map) {}
- public void writeList(List<Object> list) {}
- public void writeArray(List<Object> array) {}
-
- public void writeStruct(int type, Struct s) {}
-
- public int getSize() { return 0; }
-
- public int size() { return 0; }
- };
-
- int getSize();
-
- int size();
-
-}
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java b/java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java
index 3a7a550573..e188c15b35 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java
@@ -29,7 +29,6 @@ import java.nio.ByteBuffer;
import org.apache.qpidity.transport.codec.BBDecoder;
import org.apache.qpidity.transport.codec.Decoder;
-import org.apache.qpidity.transport.codec.FragmentDecoder;
import org.apache.qpidity.transport.ConnectionEvent;
import org.apache.qpidity.transport.Data;
@@ -52,26 +51,32 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate
{
private final Receiver<ConnectionEvent> receiver;
- private final Map<Integer,List<ByteBuffer>> segments;
+ private final Map<Integer,List<Frame>> segments;
+ private final ThreadLocal<BBDecoder> decoder = new ThreadLocal<BBDecoder>()
+ {
+ public BBDecoder initialValue()
+ {
+ return new BBDecoder();
+ }
+ };
public Assembler(Receiver<ConnectionEvent> receiver)
{
this.receiver = receiver;
- segments = new HashMap<Integer,List<ByteBuffer>>();
+ segments = new HashMap<Integer,List<Frame>>();
}
private int segmentKey(Frame frame)
{
- // XXX: can this overflow?
return (frame.getTrack() + 1) * frame.getChannel();
}
- private List<ByteBuffer> getSegment(Frame frame)
+ private List<Frame> getSegment(Frame frame)
{
return segments.get(segmentKey(frame));
}
- private void setSegment(Frame frame, List<ByteBuffer> segment)
+ private void setSegment(Frame frame, List<Frame> segment)
{
int key = segmentKey(frame);
if (segments.containsKey(key))
@@ -122,7 +127,7 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate
switch (frame.getType())
{
case BODY:
- emit(frame, new Data(frame, frame.isFirstFrame(),
+ emit(frame, new Data(frame.getBody(), frame.isFirstFrame(),
frame.isLastFrame()));
break;
default:
@@ -138,42 +143,54 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate
private void assemble(Frame frame)
{
- List<ByteBuffer> segment;
- if (frame.isFirstFrame())
+ ByteBuffer segment;
+ if (frame.isFirstFrame() && frame.isLastFrame())
{
- segment = new ArrayList<ByteBuffer>();
- setSegment(frame, segment);
+ segment = frame.getBody();
+ emit(frame, decode(frame, segment));
}
else
{
- segment = getSegment(frame);
- }
+ List<Frame> frames;
+ if (frame.isFirstFrame())
+ {
+ frames = new ArrayList<Frame>();
+ setSegment(frame, frames);
+ }
+ else
+ {
+ frames = getSegment(frame);
+ }
- for (ByteBuffer buf : frame)
- {
- segment.add(buf);
- }
+ frames.add(frame);
- if (frame.isLastFrame())
- {
- clearSegment(frame);
- emit(frame, decode(frame, frame.getType(), segment));
+ if (frame.isLastFrame())
+ {
+ clearSegment(frame);
+
+ int size = 0;
+ for (Frame f : frames)
+ {
+ size += f.getSize();
+ }
+ segment = ByteBuffer.allocate(size);
+ for (Frame f : frames)
+ {
+ segment.put(f.getBody());
+ }
+ segment.flip();
+ emit(frame, decode(frame, segment));
+ }
}
+
}
- private ProtocolEvent decode(Frame frame, SegmentType type, List<ByteBuffer> segment)
+ private ProtocolEvent decode(Frame frame, ByteBuffer segment)
{
- Decoder dec;
- if (segment.size() == 1)
- {
- dec = new BBDecoder(segment.get(0));
- }
- else
- {
- dec = new FragmentDecoder(segment.iterator());
- }
+ BBDecoder dec = decoder.get();
+ dec.init(segment);
- switch (type)
+ switch (frame.getType())
{
case CONTROL:
int controlType = dec.readUint16();
@@ -193,9 +210,9 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate
{
structs.add(dec.readStruct32());
}
- return new Header(structs,frame.isLastFrame() && frame.isLastSegment());
+ return new Header(structs, frame.isLastFrame() && frame.isLastSegment());
default:
- throw new IllegalStateException("unknown frame type: " + type);
+ throw new IllegalStateException("unknown frame type: " + frame.getType());
}
}
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/Disassembler.java b/java/common/src/main/java/org/apache/qpidity/transport/network/Disassembler.java
index da9ba84ab0..074057df56 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/network/Disassembler.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/network/Disassembler.java
@@ -21,7 +21,6 @@
package org.apache.qpidity.transport.network;
import org.apache.qpidity.transport.codec.BBEncoder;
-import org.apache.qpidity.transport.codec.SizeEncoder;
import org.apache.qpidity.transport.ConnectionEvent;
import org.apache.qpidity.transport.Data;
@@ -54,6 +53,13 @@ public class Disassembler implements Sender<ConnectionEvent>,
private final Sender<NetworkEvent> sender;
private final int maxPayload;
+ private final ThreadLocal<BBEncoder> encoder = new ThreadLocal()
+ {
+ public BBEncoder initialValue()
+ {
+ return new BBEncoder(4*1024);
+ }
+ };
public Disassembler(Sender<NetworkEvent> sender, int maxFrame)
{
@@ -72,6 +78,11 @@ public class Disassembler implements Sender<ConnectionEvent>,
event.getProtocolEvent().delegate(event, this);
}
+ public void flush()
+ {
+ sender.flush();
+ }
+
public void close()
{
sender.close();
@@ -92,8 +103,7 @@ public class Disassembler implements Sender<ConnectionEvent>,
first = false;
}
nflags |= LAST_FRAME;
- Frame frame = new Frame(nflags, type, track, event.getChannel());
- // frame.addFragment(buf);
+ Frame frame = new Frame(nflags, type, track, event.getChannel(), buf.slice());
sender.send(frame);
}
else
@@ -115,8 +125,7 @@ public class Disassembler implements Sender<ConnectionEvent>,
newflags |= LAST_FRAME;
}
- Frame frame = new Frame(newflags, type, track, event.getChannel());
- frame.addFragment(slice);
+ Frame frame = new Frame(newflags, type, track, event.getChannel(), slice);
sender.send(frame);
}
}
@@ -137,18 +146,18 @@ public class Disassembler implements Sender<ConnectionEvent>,
method(event, method, SegmentType.COMMAND);
}
- private void method(ConnectionEvent event, Method method, SegmentType type)
+ private ByteBuffer copy(ByteBuffer src)
{
- SizeEncoder sizer = new SizeEncoder();
- sizer.writeUint16(method.getEncodedType());
- if (type == SegmentType.COMMAND)
- {
- sizer.writeUint16(0);
- }
- method.write(sizer);
+ ByteBuffer buf = ByteBuffer.allocate(src.remaining());
+ buf.put(src);
+ buf.flip();
+ return buf;
+ }
- ByteBuffer buf = ByteBuffer.allocate(sizer.size());
- BBEncoder enc = new BBEncoder(buf);
+ private void method(ConnectionEvent event, Method method, SegmentType type)
+ {
+ BBEncoder enc = encoder.get();
+ enc.init();
enc.writeUint16(method.getEncodedType());
if (type == SegmentType.COMMAND)
{
@@ -162,7 +171,7 @@ public class Disassembler implements Sender<ConnectionEvent>,
}
}
method.write(enc);
- buf.flip();
+ ByteBuffer buf = enc.done();
byte flags = FIRST_SEG;
@@ -176,42 +185,29 @@ public class Disassembler implements Sender<ConnectionEvent>,
public void header(ConnectionEvent event, Header header)
{
- ByteBuffer buf;
- if( header.getBuf() == null)
+ ByteBuffer buf;
+ if (header.getBuf() == null)
{
- SizeEncoder sizer = new SizeEncoder();
- for (Struct st : header.getStructs())
- {
- sizer.writeStruct32(st);
- }
-
- buf = ByteBuffer.allocate(sizer.size());
- BBEncoder enc = new BBEncoder(buf);
+ BBEncoder enc = encoder.get();
+ enc.init();
for (Struct st : header.getStructs())
{
enc.writeStruct32(st);
}
+ buf = enc.done();
header.setBuf(buf);
}
else
{
buf = header.getBuf();
+ buf.flip();
}
- buf.flip();
fragment((byte) 0x0, SegmentType.HEADER, event, buf, true, true);
}
public void data(ConnectionEvent event, Data data)
{
- boolean first = data.isFirst();
- for (Iterator<ByteBuffer> it = data.getFragments().iterator();
- it.hasNext(); )
- {
- ByteBuffer buf = it.next();
- boolean last = data.isLast() && !it.hasNext();
- fragment(LAST_SEG, SegmentType.BODY, event, buf, first, last);
- first = false;
- }
+ fragment(LAST_SEG, SegmentType.BODY, event, data.getData(), data.isFirst(), data.isLast());
}
public void error(ConnectionEvent event, ProtocolError error)
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java b/java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java
index 2abac382e6..7b8675b39d 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java
@@ -38,7 +38,7 @@ import static org.apache.qpidity.transport.util.Functions.*;
* @author Rafael H. Schloming
*/
-public final class Frame implements NetworkEvent, Iterable<ByteBuffer>
+public final class Frame implements NetworkEvent
{
public static final int HEADER_SIZE = 12;
@@ -61,23 +61,21 @@ public final class Frame implements NetworkEvent, Iterable<ByteBuffer>
final private SegmentType type;
final private byte track;
final private int channel;
- final private List<ByteBuffer> fragments;
- private int size;
+ final private ByteBuffer body;
- public Frame(byte flags, SegmentType type, byte track, int channel)
+ public Frame(byte flags, SegmentType type, byte track, int channel,
+ ByteBuffer body)
{
this.flags = flags;
this.type = type;
this.track = track;
this.channel = channel;
- this.size = 0;
- this.fragments = new ArrayList<ByteBuffer>();
+ this.body = body;
}
- public void addFragment(ByteBuffer fragment)
+ public ByteBuffer getBody()
{
- fragments.add(fragment);
- size += fragment.remaining();
+ return body.slice();
}
public byte getFlags()
@@ -92,7 +90,7 @@ public final class Frame implements NetworkEvent, Iterable<ByteBuffer>
public int getSize()
{
- return size;
+ return body.remaining();
}
public SegmentType getType()
@@ -130,16 +128,6 @@ public final class Frame implements NetworkEvent, Iterable<ByteBuffer>
return flag(LAST_FRAME);
}
- public Iterator<ByteBuffer> getFragments()
- {
- return new SliceIterator(fragments.iterator());
- }
-
- public Iterator<ByteBuffer> iterator()
- {
- return getFragments();
- }
-
public void delegate(NetworkDelegate delegate)
{
delegate.frame(this);
@@ -148,26 +136,14 @@ public final class Frame implements NetworkEvent, Iterable<ByteBuffer>
public String toString()
{
StringBuilder str = new StringBuilder();
+
str.append(String.format
("[%05d %05d %1d %s %d%d%d%d] ", getChannel(), getSize(),
getTrack(), getType(),
isFirstSegment() ? 1 : 0, isLastSegment() ? 1 : 0,
isFirstFrame() ? 1 : 0, isLastFrame() ? 1 : 0));
- boolean first = true;
- for (ByteBuffer buf : this)
- {
- if (first)
- {
- first = false;
- }
- else
- {
- str.append(" | ");
- }
-
- str.append(str(buf));
- }
+ str.append(str(body));
return str.toString();
}
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java b/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java
index d1c03348b4..48f68e9020 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java
@@ -79,7 +79,7 @@ public class InputHandler implements Receiver<ByteBuffer>
private byte track;
private int channel;
private int size;
- private Frame frame;
+ private ByteBuffer body;
public InputHandler(Receiver<NetworkEvent> receiver, State state)
{
@@ -99,9 +99,9 @@ public class InputHandler implements Receiver<ByteBuffer>
private void frame()
{
+ Frame frame = new Frame(flags, type, track, channel, body);
assert size == frame.getSize();
receiver.received(frame);
- frame = null;
}
private void error(String fmt, Object ... args)
@@ -191,30 +191,28 @@ public class InputHandler implements Receiver<ByteBuffer>
return ERROR;
}
- frame = new Frame(flags, type, track, channel);
if (size > buf.remaining()) {
- frame.addFragment(buf.slice());
- buf.position(buf.limit());
+ body = ByteBuffer.allocate(size);
+ body.put(buf);
return FRAME_FRAGMENT;
} else {
- ByteBuffer payload = buf.slice();
- payload.limit(size);
+ body = buf.slice();
+ body.limit(size);
buf.position(buf.position() + size);
- frame.addFragment(payload);
frame();
return FRAME_HDR;
}
case FRAME_FRAGMENT:
- int delta = size - frame.getSize();
+ int delta = body.remaining();
if (delta > buf.remaining()) {
- frame.addFragment(buf.slice());
- buf.position(buf.limit());
+ body.put(buf);
return FRAME_FRAGMENT;
} else {
ByteBuffer fragment = buf.slice();
fragment.limit(delta);
buf.position(buf.position() + delta);
- frame.addFragment(fragment);
+ body.put(fragment);
+ body.flip();
frame();
return FRAME_HDR;
}
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java b/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java
index b749332fa3..e2ef8ca0d5 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java
@@ -69,6 +69,7 @@ public class OutputHandler implements Sender<NetworkEvent>, NetworkDelegate
synchronized (lock)
{
sender.send(header.toByteBuffer());
+ sender.flush();
}
}
@@ -79,35 +80,43 @@ public class OutputHandler implements Sender<NetworkEvent>, NetworkDelegate
frames.add(frame);
bytes += HEADER_SIZE + frame.getSize();
- if (frame.isLastFrame() && frame.isLastSegment() || bytes > 64*1024)
+ if (bytes > 64*1024)
{
- ByteBuffer buf = ByteBuffer.allocate(bytes);
- for (Frame f : frames)
- {
- buf.put(f.getFlags());
- buf.put((byte) f.getType().getValue());
- buf.putShort((short) (f.getSize() + HEADER_SIZE));
- // RESERVED
- buf.put(RESERVED);
- buf.put(f.getTrack());
- buf.putShort((short) f.getChannel());
- // RESERVED
- buf.putInt(0);
- for(ByteBuffer frg : f)
- {
- buf.put(frg);
- }
- }
- buf.flip();
-
- frames.clear();
- bytes = 0;
-
- sender.send(buf);
+ flush();
}
}
}
+ public void flush()
+ {
+ synchronized (lock)
+ {
+ ByteBuffer buf = ByteBuffer.allocate(bytes);
+ int nframes = frames.size();
+ for (int i = 0; i < nframes; i++)
+ {
+ Frame frame = frames.get(i);
+ buf.put(frame.getFlags());
+ buf.put((byte) frame.getType().getValue());
+ buf.putShort((short) (frame.getSize() + HEADER_SIZE));
+ // RESERVED
+ buf.put(RESERVED);
+ buf.put(frame.getTrack());
+ buf.putShort((short) frame.getChannel());
+ // RESERVED
+ buf.putInt(0);
+ buf.put(frame.getBody());
+ }
+ buf.flip();
+
+ frames.clear();
+ bytes = 0;
+
+ sender.send(buf);
+ sender.flush();
+ }
+ }
+
public void error(ProtocolError error)
{
throw new IllegalStateException("XXX");
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoSender.java b/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoSender.java
index 1adde531a6..d50442be5c 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoSender.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoSender.java
@@ -55,6 +55,11 @@ public class IoSender implements Sender<java.nio.ByteBuffer>
write(buf);
}
+ public void flush()
+ {
+ // pass
+ }
+
/* The extra copying sucks.
* If I know for sure that the buf is backed
* by an array then I could do buf.array()
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaSender.java b/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaSender.java
index f0f5731037..a53c36ae2e 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaSender.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaSender.java
@@ -58,6 +58,11 @@ public class MinaSender implements Sender<java.nio.ByteBuffer>
}
}
+ public void flush()
+ {
+ // pass
+ }
+
public synchronized void close()
{
// MINA will sometimes throw away in-progress writes when you
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/nio/NioSender.java b/java/common/src/main/java/org/apache/qpidity/transport/network/nio/NioSender.java
index 2cfe6c2089..798f24b528 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/network/nio/NioSender.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/network/nio/NioSender.java
@@ -47,6 +47,11 @@ public class NioSender implements Sender<java.nio.ByteBuffer>
}
}
+ public void flush()
+ {
+ // pass
+ }
+
private void write(java.nio.ByteBuffer buf)
{
synchronized (lock)