diff options
Diffstat (limited to 'qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java')
-rw-r--r-- | qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java | 65 |
1 files changed, 47 insertions, 18 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java index 357caa26e1..1a85ab88a5 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java @@ -20,38 +20,36 @@ */ package org.apache.qpid.transport.network; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.nio.ByteBuffer; - -import org.apache.qpid.transport.codec.BBDecoder; -import org.apache.qpid.transport.codec.Decoder; - import org.apache.qpid.transport.Header; import org.apache.qpid.transport.Method; import org.apache.qpid.transport.ProtocolError; import org.apache.qpid.transport.ProtocolEvent; import org.apache.qpid.transport.ProtocolHeader; import org.apache.qpid.transport.Receiver; -import org.apache.qpid.transport.SegmentType; import org.apache.qpid.transport.Struct; - +import org.apache.qpid.transport.codec.BBDecoder; /** * Assembler * */ - public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate { + // Use a small array to store incomplete Methods for low-value channels, instead of allocating a huge + // array or always boxing the channelId and looking it up in the map. This value must be of the form 2^X - 1. + private static final int ARRAY_SIZE = 0xFF; + private final Method[] _incompleteMethodArray = new Method[ARRAY_SIZE + 1]; + private final Map<Integer, Method> _incompleteMethodMap = new HashMap<Integer, Method>(); private final Receiver<ProtocolEvent> receiver; private final Map<Integer,List<Frame>> segments; - private final Method[] incomplete; - private final ThreadLocal<BBDecoder> decoder = new ThreadLocal<BBDecoder>() + private static final ThreadLocal<BBDecoder> _decoder = new ThreadLocal<BBDecoder>() { public BBDecoder initialValue() { @@ -63,7 +61,6 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate { this.receiver = receiver; segments = new HashMap<Integer,List<Frame>>(); - incomplete = new Method[64*1024]; } private int segmentKey(Frame frame) @@ -169,7 +166,7 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate private void assemble(Frame frame, ByteBuffer segment) { - BBDecoder dec = decoder.get(); + BBDecoder dec = _decoder.get(); dec.init(segment); int channel = frame.getChannel(); @@ -192,7 +189,7 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate command.read(dec); if (command.hasPayload()) { - incomplete[channel] = command; + setIncompleteCommand(channel, command); } else { @@ -200,8 +197,8 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate } break; case HEADER: - command = incomplete[channel]; - List<Struct> structs = new ArrayList(2); + command = getIncompleteCommand(channel); + List<Struct> structs = new ArrayList<Struct>(2); while (dec.hasRemaining()) { structs.add(dec.readStruct32()); @@ -209,19 +206,51 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate command.setHeader(new Header(structs)); if (frame.isLastSegment()) { - incomplete[channel] = null; + setIncompleteCommand(channel, null); emit(channel, command); } break; case BODY: - command = incomplete[channel]; + command = getIncompleteCommand(channel); command.setBody(segment); - incomplete[channel] = null; + setIncompleteCommand(channel, null); emit(channel, command); break; default: throw new IllegalStateException("unknown frame type: " + frame.getType()); } + + dec.releaseBuffer(); } + private void setIncompleteCommand(int channelId, Method incomplete) + { + if ((channelId & ARRAY_SIZE) == channelId) + { + _incompleteMethodArray[channelId] = incomplete; + } + else + { + if(incomplete != null) + { + _incompleteMethodMap.put(channelId, incomplete); + } + else + { + _incompleteMethodMap.remove(channelId); + } + } + } + + private Method getIncompleteCommand(int channelId) + { + if ((channelId & ARRAY_SIZE) == channelId) + { + return _incompleteMethodArray[channelId]; + } + else + { + return _incompleteMethodMap.get(channelId); + } + } } |