summaryrefslogtreecommitdiff
path: root/qpid/java/common/src/main/java/org/apache/qpid/transport
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/common/src/main/java/org/apache/qpid/transport')
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Binary.java9
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java7
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractDecoder.java10
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBDecoder.java5
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java65
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java57
6 files changed, 92 insertions, 61 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Binary.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Binary.java
index 4e97855a6f..491a7ac218 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Binary.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Binary.java
@@ -142,4 +142,13 @@ public final class Binary
return str(ByteBuffer.wrap(bytes, offset, size));
}
+ public boolean hasExcessCapacity()
+ {
+ return size != bytes.length;
+ }
+
+ public Binary copy()
+ {
+ return new Binary(getBytes());
+ }
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
index b8e7616a37..f21df251da 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
@@ -35,9 +35,7 @@ import org.slf4j.LoggerFactory;
/**
* ServerDelegate
- *
*/
-
public class ServerDelegate extends ConnectionDelegate
{
protected static final Logger _logger = LoggerFactory.getLogger(ServerDelegate.class);
@@ -140,12 +138,12 @@ public class ServerDelegate extends ConnectionDelegate
protected int getHeartbeatMax()
{
- return Integer.MAX_VALUE;
+ return 0xFFFF;
}
protected int getChannelMax()
{
- return Integer.MAX_VALUE;
+ return 0xFFFF;
}
@Override
@@ -202,5 +200,4 @@ public class ServerDelegate extends ConnectionDelegate
ssn.sessionAttached(atc.getName());
ssn.setState(Session.State.OPEN);
}
-
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractDecoder.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractDecoder.java
index a8a4997ae7..09ce6a7eb1 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractDecoder.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractDecoder.java
@@ -143,10 +143,18 @@ abstract class AbstractDecoder implements Decoder
short size = readUint8();
Binary bin = get(size);
String str = str8cache.get(bin);
+
if (str == null)
{
str = decode(bin.array(), bin.offset(), bin.size(), "UTF-8");
- str8cache.put(bin, str);
+ if(bin.hasExcessCapacity())
+ {
+ str8cache.put(bin.copy(), str);
+ }
+ else
+ {
+ str8cache.put(bin, str);
+ }
}
return str;
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBDecoder.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBDecoder.java
index 6f7a2fa3b2..10f67e1cd6 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBDecoder.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBDecoder.java
@@ -41,6 +41,11 @@ public final class BBDecoder extends AbstractDecoder
this.in.order(ByteOrder.BIG_ENDIAN);
}
+ public void releaseBuffer()
+ {
+ in = null;
+ }
+
protected byte doGet()
{
return in.get();
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java
index 357caa26e1..1a85ab88a5 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java
@@ -20,38 +20,36 @@
*/
package org.apache.qpid.transport.network;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.nio.ByteBuffer;
-
-import org.apache.qpid.transport.codec.BBDecoder;
-import org.apache.qpid.transport.codec.Decoder;
-
import org.apache.qpid.transport.Header;
import org.apache.qpid.transport.Method;
import org.apache.qpid.transport.ProtocolError;
import org.apache.qpid.transport.ProtocolEvent;
import org.apache.qpid.transport.ProtocolHeader;
import org.apache.qpid.transport.Receiver;
-import org.apache.qpid.transport.SegmentType;
import org.apache.qpid.transport.Struct;
-
+import org.apache.qpid.transport.codec.BBDecoder;
/**
* Assembler
*
*/
-
public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate
{
+ // Use a small array to store incomplete Methods for low-value channels, instead of allocating a huge
+ // array or always boxing the channelId and looking it up in the map. This value must be of the form 2^X - 1.
+ private static final int ARRAY_SIZE = 0xFF;
+ private final Method[] _incompleteMethodArray = new Method[ARRAY_SIZE + 1];
+ private final Map<Integer, Method> _incompleteMethodMap = new HashMap<Integer, Method>();
private final Receiver<ProtocolEvent> receiver;
private final Map<Integer,List<Frame>> segments;
- private final Method[] incomplete;
- private final ThreadLocal<BBDecoder> decoder = new ThreadLocal<BBDecoder>()
+ private static final ThreadLocal<BBDecoder> _decoder = new ThreadLocal<BBDecoder>()
{
public BBDecoder initialValue()
{
@@ -63,7 +61,6 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate
{
this.receiver = receiver;
segments = new HashMap<Integer,List<Frame>>();
- incomplete = new Method[64*1024];
}
private int segmentKey(Frame frame)
@@ -169,7 +166,7 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate
private void assemble(Frame frame, ByteBuffer segment)
{
- BBDecoder dec = decoder.get();
+ BBDecoder dec = _decoder.get();
dec.init(segment);
int channel = frame.getChannel();
@@ -192,7 +189,7 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate
command.read(dec);
if (command.hasPayload())
{
- incomplete[channel] = command;
+ setIncompleteCommand(channel, command);
}
else
{
@@ -200,8 +197,8 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate
}
break;
case HEADER:
- command = incomplete[channel];
- List<Struct> structs = new ArrayList(2);
+ command = getIncompleteCommand(channel);
+ List<Struct> structs = new ArrayList<Struct>(2);
while (dec.hasRemaining())
{
structs.add(dec.readStruct32());
@@ -209,19 +206,51 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate
command.setHeader(new Header(structs));
if (frame.isLastSegment())
{
- incomplete[channel] = null;
+ setIncompleteCommand(channel, null);
emit(channel, command);
}
break;
case BODY:
- command = incomplete[channel];
+ command = getIncompleteCommand(channel);
command.setBody(segment);
- incomplete[channel] = null;
+ setIncompleteCommand(channel, null);
emit(channel, command);
break;
default:
throw new IllegalStateException("unknown frame type: " + frame.getType());
}
+
+ dec.releaseBuffer();
}
+ private void setIncompleteCommand(int channelId, Method incomplete)
+ {
+ if ((channelId & ARRAY_SIZE) == channelId)
+ {
+ _incompleteMethodArray[channelId] = incomplete;
+ }
+ else
+ {
+ if(incomplete != null)
+ {
+ _incompleteMethodMap.put(channelId, incomplete);
+ }
+ else
+ {
+ _incompleteMethodMap.remove(channelId);
+ }
+ }
+ }
+
+ private Method getIncompleteCommand(int channelId)
+ {
+ if ((channelId & ARRAY_SIZE) == channelId)
+ {
+ return _incompleteMethodArray[channelId];
+ }
+ else
+ {
+ return _incompleteMethodMap.get(channelId);
+ }
+ }
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
index ab174b00b3..685034d1a9 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
@@ -40,21 +40,15 @@ import static java.lang.Math.min;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
-
/**
* Disassembler
- *
*/
-
-public final class Disassembler implements Sender<ProtocolEvent>,
- ProtocolDelegate<Void>
+public final class Disassembler implements Sender<ProtocolEvent>, ProtocolDelegate<Void>
{
-
private final Sender<ByteBuffer> sender;
private final int maxPayload;
- private final ByteBuffer header;
private final Object sendlock = new Object();
- private final ThreadLocal<BBEncoder> encoder = new ThreadLocal<BBEncoder>()
+ private final static ThreadLocal<BBEncoder> _encoder = new ThreadLocal<BBEncoder>()
{
public BBEncoder initialValue()
{
@@ -66,14 +60,10 @@ public final class Disassembler implements Sender<ProtocolEvent>,
{
if (maxFrame <= HEADER_SIZE || maxFrame >= 64*1024)
{
- throw new IllegalArgumentException
- ("maxFrame must be > HEADER_SIZE and < 64K: " + maxFrame);
+ throw new IllegalArgumentException("maxFrame must be > HEADER_SIZE and < 64K: " + maxFrame);
}
this.sender = sender;
this.maxPayload = maxFrame - HEADER_SIZE;
- this.header = ByteBuffer.allocate(HEADER_SIZE);
- this.header.order(ByteOrder.BIG_ENDIAN);
-
}
public void send(ProtocolEvent event)
@@ -101,25 +91,27 @@ public final class Disassembler implements Sender<ProtocolEvent>,
{
synchronized (sendlock)
{
- header.put(0, flags);
- header.put(1, type);
- header.putShort(2, (short) (size + HEADER_SIZE));
- header.put(5, track);
- header.putShort(6, (short) channel);
-
- header.rewind();
-
- sender.send(header);
+ ByteBuffer data = ByteBuffer.allocate(size + HEADER_SIZE);
+ data.order(ByteOrder.BIG_ENDIAN);
+
+ data.put(0, flags);
+ data.put(1, type);
+ data.putShort(2, (short) (size + HEADER_SIZE));
+ data.put(5, track);
+ data.putShort(6, (short) channel);
+ data.position(HEADER_SIZE);
int limit = buf.limit();
buf.limit(buf.position() + size);
- sender.send(buf);
+ data.put(buf);
buf.limit(limit);
+
+ data.rewind();
+ sender.send(data);
}
}
- private void fragment(byte flags, SegmentType type, ProtocolEvent event,
- ByteBuffer buf)
+ private void fragment(byte flags, SegmentType type, ProtocolEvent event, ByteBuffer buf)
{
byte typeb = (byte) type.getValue();
byte track = event.getEncodedTrack() == Frame.L4 ? (byte) 1 : (byte) 0;
@@ -170,17 +162,9 @@ public final class Disassembler implements Sender<ProtocolEvent>,
method(method, SegmentType.COMMAND);
}
- private ByteBuffer copy(ByteBuffer src)
- {
- ByteBuffer buf = ByteBuffer.allocate(src.remaining());
- buf.put(src);
- buf.flip();
- return buf;
- }
-
private void method(Method method, SegmentType type)
{
- BBEncoder enc = encoder.get();
+ BBEncoder enc = _encoder.get();
enc.init();
enc.writeUint16(method.getEncodedType());
if (type == SegmentType.COMMAND)
@@ -227,8 +211,7 @@ public final class Disassembler implements Sender<ProtocolEvent>,
if (payload)
{
ByteBuffer body = method.getBody();
- fragment(body == null ? LAST_SEG : 0x0, SegmentType.HEADER,
- method, headerSeg);
+ fragment(body == null ? LAST_SEG : 0x0, SegmentType.HEADER, method, headerSeg);
if (body != null)
{
fragment(LAST_SEG, SegmentType.BODY, method, body);
@@ -240,7 +223,7 @@ public final class Disassembler implements Sender<ProtocolEvent>,
public void error(Void v, ProtocolError error)
{
- throw new IllegalArgumentException("" + error);
+ throw new IllegalArgumentException(String.valueOf(error));
}
public void setIdleTimeout(int i)