diff options
-rw-r--r-- | java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java | 55 |
1 files changed, 42 insertions, 13 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java b/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java index b3c419959c..1a85ab88a5 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java @@ -20,15 +20,12 @@ */ package org.apache.qpid.transport.network; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.nio.ByteBuffer; - -import org.apache.qpid.transport.codec.BBDecoder; - import org.apache.qpid.transport.Header; import org.apache.qpid.transport.Method; import org.apache.qpid.transport.ProtocolError; @@ -36,19 +33,22 @@ import org.apache.qpid.transport.ProtocolEvent; import org.apache.qpid.transport.ProtocolHeader; import org.apache.qpid.transport.Receiver; import org.apache.qpid.transport.Struct; - +import org.apache.qpid.transport.codec.BBDecoder; /** * Assembler * */ - public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate { + // Use a small array to store incomplete Methods for low-value channels, instead of allocating a huge + // array or always boxing the channelId and looking it up in the map. This value must be of the form 2^X - 1. + private static final int ARRAY_SIZE = 0xFF; + private final Method[] _incompleteMethodArray = new Method[ARRAY_SIZE + 1]; + private final Map<Integer, Method> _incompleteMethodMap = new HashMap<Integer, Method>(); private final Receiver<ProtocolEvent> receiver; private final Map<Integer,List<Frame>> segments; - private final Method[] incomplete; private static final ThreadLocal<BBDecoder> _decoder = new ThreadLocal<BBDecoder>() { public BBDecoder initialValue() @@ -61,7 +61,6 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate { this.receiver = receiver; segments = new HashMap<Integer,List<Frame>>(); - incomplete = new Method[64*1024]; } private int segmentKey(Frame frame) @@ -190,7 +189,7 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate command.read(dec); if (command.hasPayload()) { - incomplete[channel] = command; + setIncompleteCommand(channel, command); } else { @@ -198,7 +197,7 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate } break; case HEADER: - command = incomplete[channel]; + command = getIncompleteCommand(channel); List<Struct> structs = new ArrayList<Struct>(2); while (dec.hasRemaining()) { @@ -207,14 +206,14 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate command.setHeader(new Header(structs)); if (frame.isLastSegment()) { - incomplete[channel] = null; + setIncompleteCommand(channel, null); emit(channel, command); } break; case BODY: - command = incomplete[channel]; + command = getIncompleteCommand(channel); command.setBody(segment); - incomplete[channel] = null; + setIncompleteCommand(channel, null); emit(channel, command); break; default: @@ -224,4 +223,34 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate dec.releaseBuffer(); } + private void setIncompleteCommand(int channelId, Method incomplete) + { + if ((channelId & ARRAY_SIZE) == channelId) + { + _incompleteMethodArray[channelId] = incomplete; + } + else + { + if(incomplete != null) + { + _incompleteMethodMap.put(channelId, incomplete); + } + else + { + _incompleteMethodMap.remove(channelId); + } + } + } + + private Method getIncompleteCommand(int channelId) + { + if ((channelId & ARRAY_SIZE) == channelId) + { + return _incompleteMethodArray[channelId]; + } + else + { + return _incompleteMethodMap.get(channelId); + } + } } |