summaryrefslogtreecommitdiff
path: root/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java')
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java65
1 files changed, 47 insertions, 18 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java
index 357caa26e1..1a85ab88a5 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java
@@ -20,38 +20,36 @@
*/
package org.apache.qpid.transport.network;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.nio.ByteBuffer;
-
-import org.apache.qpid.transport.codec.BBDecoder;
-import org.apache.qpid.transport.codec.Decoder;
-
import org.apache.qpid.transport.Header;
import org.apache.qpid.transport.Method;
import org.apache.qpid.transport.ProtocolError;
import org.apache.qpid.transport.ProtocolEvent;
import org.apache.qpid.transport.ProtocolHeader;
import org.apache.qpid.transport.Receiver;
-import org.apache.qpid.transport.SegmentType;
import org.apache.qpid.transport.Struct;
-
+import org.apache.qpid.transport.codec.BBDecoder;
/**
* Assembler
*
*/
-
public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate
{
+ // Use a small array to store incomplete Methods for low-value channels, instead of allocating a huge
+ // array or always boxing the channelId and looking it up in the map. This value must be of the form 2^X - 1.
+ private static final int ARRAY_SIZE = 0xFF;
+ private final Method[] _incompleteMethodArray = new Method[ARRAY_SIZE + 1];
+ private final Map<Integer, Method> _incompleteMethodMap = new HashMap<Integer, Method>();
private final Receiver<ProtocolEvent> receiver;
private final Map<Integer,List<Frame>> segments;
- private final Method[] incomplete;
- private final ThreadLocal<BBDecoder> decoder = new ThreadLocal<BBDecoder>()
+ private static final ThreadLocal<BBDecoder> _decoder = new ThreadLocal<BBDecoder>()
{
public BBDecoder initialValue()
{
@@ -63,7 +61,6 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate
{
this.receiver = receiver;
segments = new HashMap<Integer,List<Frame>>();
- incomplete = new Method[64*1024];
}
private int segmentKey(Frame frame)
@@ -169,7 +166,7 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate
private void assemble(Frame frame, ByteBuffer segment)
{
- BBDecoder dec = decoder.get();
+ BBDecoder dec = _decoder.get();
dec.init(segment);
int channel = frame.getChannel();
@@ -192,7 +189,7 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate
command.read(dec);
if (command.hasPayload())
{
- incomplete[channel] = command;
+ setIncompleteCommand(channel, command);
}
else
{
@@ -200,8 +197,8 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate
}
break;
case HEADER:
- command = incomplete[channel];
- List<Struct> structs = new ArrayList(2);
+ command = getIncompleteCommand(channel);
+ List<Struct> structs = new ArrayList<Struct>(2);
while (dec.hasRemaining())
{
structs.add(dec.readStruct32());
@@ -209,19 +206,51 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate
command.setHeader(new Header(structs));
if (frame.isLastSegment())
{
- incomplete[channel] = null;
+ setIncompleteCommand(channel, null);
emit(channel, command);
}
break;
case BODY:
- command = incomplete[channel];
+ command = getIncompleteCommand(channel);
command.setBody(segment);
- incomplete[channel] = null;
+ setIncompleteCommand(channel, null);
emit(channel, command);
break;
default:
throw new IllegalStateException("unknown frame type: " + frame.getType());
}
+
+ dec.releaseBuffer();
}
+ private void setIncompleteCommand(int channelId, Method incomplete)
+ {
+ if ((channelId & ARRAY_SIZE) == channelId)
+ {
+ _incompleteMethodArray[channelId] = incomplete;
+ }
+ else
+ {
+ if(incomplete != null)
+ {
+ _incompleteMethodMap.put(channelId, incomplete);
+ }
+ else
+ {
+ _incompleteMethodMap.remove(channelId);
+ }
+ }
+ }
+
+ private Method getIncompleteCommand(int channelId)
+ {
+ if ((channelId & ARRAY_SIZE) == channelId)
+ {
+ return _incompleteMethodArray[channelId];
+ }
+ else
+ {
+ return _incompleteMethodMap.get(channelId);
+ }
+ }
}