diff options
Diffstat (limited to 'qpid/java/common/src/main/java/org/apache/qpid/transport')
6 files changed, 92 insertions, 61 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Binary.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Binary.java index 4e97855a6f..491a7ac218 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Binary.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Binary.java @@ -142,4 +142,13 @@ public final class Binary return str(ByteBuffer.wrap(bytes, offset, size)); } + public boolean hasExcessCapacity() + { + return size != bytes.length; + } + + public Binary copy() + { + return new Binary(getBytes()); + } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java index b8e7616a37..f21df251da 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java @@ -35,9 +35,7 @@ import org.slf4j.LoggerFactory; /** * ServerDelegate - * */ - public class ServerDelegate extends ConnectionDelegate { protected static final Logger _logger = LoggerFactory.getLogger(ServerDelegate.class); @@ -140,12 +138,12 @@ public class ServerDelegate extends ConnectionDelegate protected int getHeartbeatMax() { - return Integer.MAX_VALUE; + return 0xFFFF; } protected int getChannelMax() { - return Integer.MAX_VALUE; + return 0xFFFF; } @Override @@ -202,5 +200,4 @@ public class ServerDelegate extends ConnectionDelegate ssn.sessionAttached(atc.getName()); ssn.setState(Session.State.OPEN); } - } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractDecoder.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractDecoder.java index a8a4997ae7..09ce6a7eb1 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractDecoder.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractDecoder.java @@ -143,10 +143,18 @@ abstract class AbstractDecoder implements Decoder short size = readUint8(); Binary bin = get(size); String str = str8cache.get(bin); + if (str == null) { str = decode(bin.array(), bin.offset(), bin.size(), "UTF-8"); - str8cache.put(bin, str); + if(bin.hasExcessCapacity()) + { + str8cache.put(bin.copy(), str); + } + else + { + str8cache.put(bin, str); + } } return str; } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBDecoder.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBDecoder.java index 6f7a2fa3b2..10f67e1cd6 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBDecoder.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBDecoder.java @@ -41,6 +41,11 @@ public final class BBDecoder extends AbstractDecoder this.in.order(ByteOrder.BIG_ENDIAN); } + public void releaseBuffer() + { + in = null; + } + protected byte doGet() { return in.get(); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java index 357caa26e1..1a85ab88a5 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java @@ -20,38 +20,36 @@ */ package org.apache.qpid.transport.network; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.nio.ByteBuffer; - -import org.apache.qpid.transport.codec.BBDecoder; -import org.apache.qpid.transport.codec.Decoder; - import org.apache.qpid.transport.Header; import org.apache.qpid.transport.Method; import org.apache.qpid.transport.ProtocolError; import org.apache.qpid.transport.ProtocolEvent; import org.apache.qpid.transport.ProtocolHeader; import org.apache.qpid.transport.Receiver; -import org.apache.qpid.transport.SegmentType; import org.apache.qpid.transport.Struct; - +import org.apache.qpid.transport.codec.BBDecoder; /** * Assembler * */ - public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate { + // Use a small array to store incomplete Methods for low-value channels, instead of allocating a huge + // array or always boxing the channelId and looking it up in the map. This value must be of the form 2^X - 1. + private static final int ARRAY_SIZE = 0xFF; + private final Method[] _incompleteMethodArray = new Method[ARRAY_SIZE + 1]; + private final Map<Integer, Method> _incompleteMethodMap = new HashMap<Integer, Method>(); private final Receiver<ProtocolEvent> receiver; private final Map<Integer,List<Frame>> segments; - private final Method[] incomplete; - private final ThreadLocal<BBDecoder> decoder = new ThreadLocal<BBDecoder>() + private static final ThreadLocal<BBDecoder> _decoder = new ThreadLocal<BBDecoder>() { public BBDecoder initialValue() { @@ -63,7 +61,6 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate { this.receiver = receiver; segments = new HashMap<Integer,List<Frame>>(); - incomplete = new Method[64*1024]; } private int segmentKey(Frame frame) @@ -169,7 +166,7 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate private void assemble(Frame frame, ByteBuffer segment) { - BBDecoder dec = decoder.get(); + BBDecoder dec = _decoder.get(); dec.init(segment); int channel = frame.getChannel(); @@ -192,7 +189,7 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate command.read(dec); if (command.hasPayload()) { - incomplete[channel] = command; + setIncompleteCommand(channel, command); } else { @@ -200,8 +197,8 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate } break; case HEADER: - command = incomplete[channel]; - List<Struct> structs = new ArrayList(2); + command = getIncompleteCommand(channel); + List<Struct> structs = new ArrayList<Struct>(2); while (dec.hasRemaining()) { structs.add(dec.readStruct32()); @@ -209,19 +206,51 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate command.setHeader(new Header(structs)); if (frame.isLastSegment()) { - incomplete[channel] = null; + setIncompleteCommand(channel, null); emit(channel, command); } break; case BODY: - command = incomplete[channel]; + command = getIncompleteCommand(channel); command.setBody(segment); - incomplete[channel] = null; + setIncompleteCommand(channel, null); emit(channel, command); break; default: throw new IllegalStateException("unknown frame type: " + frame.getType()); } + + dec.releaseBuffer(); } + private void setIncompleteCommand(int channelId, Method incomplete) + { + if ((channelId & ARRAY_SIZE) == channelId) + { + _incompleteMethodArray[channelId] = incomplete; + } + else + { + if(incomplete != null) + { + _incompleteMethodMap.put(channelId, incomplete); + } + else + { + _incompleteMethodMap.remove(channelId); + } + } + } + + private Method getIncompleteCommand(int channelId) + { + if ((channelId & ARRAY_SIZE) == channelId) + { + return _incompleteMethodArray[channelId]; + } + else + { + return _incompleteMethodMap.get(channelId); + } + } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java index ab174b00b3..685034d1a9 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java @@ -40,21 +40,15 @@ import static java.lang.Math.min; import java.nio.ByteBuffer; import java.nio.ByteOrder; - /** * Disassembler - * */ - -public final class Disassembler implements Sender<ProtocolEvent>, - ProtocolDelegate<Void> +public final class Disassembler implements Sender<ProtocolEvent>, ProtocolDelegate<Void> { - private final Sender<ByteBuffer> sender; private final int maxPayload; - private final ByteBuffer header; private final Object sendlock = new Object(); - private final ThreadLocal<BBEncoder> encoder = new ThreadLocal<BBEncoder>() + private final static ThreadLocal<BBEncoder> _encoder = new ThreadLocal<BBEncoder>() { public BBEncoder initialValue() { @@ -66,14 +60,10 @@ public final class Disassembler implements Sender<ProtocolEvent>, { if (maxFrame <= HEADER_SIZE || maxFrame >= 64*1024) { - throw new IllegalArgumentException - ("maxFrame must be > HEADER_SIZE and < 64K: " + maxFrame); + throw new IllegalArgumentException("maxFrame must be > HEADER_SIZE and < 64K: " + maxFrame); } this.sender = sender; this.maxPayload = maxFrame - HEADER_SIZE; - this.header = ByteBuffer.allocate(HEADER_SIZE); - this.header.order(ByteOrder.BIG_ENDIAN); - } public void send(ProtocolEvent event) @@ -101,25 +91,27 @@ public final class Disassembler implements Sender<ProtocolEvent>, { synchronized (sendlock) { - header.put(0, flags); - header.put(1, type); - header.putShort(2, (short) (size + HEADER_SIZE)); - header.put(5, track); - header.putShort(6, (short) channel); - - header.rewind(); - - sender.send(header); + ByteBuffer data = ByteBuffer.allocate(size + HEADER_SIZE); + data.order(ByteOrder.BIG_ENDIAN); + + data.put(0, flags); + data.put(1, type); + data.putShort(2, (short) (size + HEADER_SIZE)); + data.put(5, track); + data.putShort(6, (short) channel); + data.position(HEADER_SIZE); int limit = buf.limit(); buf.limit(buf.position() + size); - sender.send(buf); + data.put(buf); buf.limit(limit); + + data.rewind(); + sender.send(data); } } - private void fragment(byte flags, SegmentType type, ProtocolEvent event, - ByteBuffer buf) + private void fragment(byte flags, SegmentType type, ProtocolEvent event, ByteBuffer buf) { byte typeb = (byte) type.getValue(); byte track = event.getEncodedTrack() == Frame.L4 ? (byte) 1 : (byte) 0; @@ -170,17 +162,9 @@ public final class Disassembler implements Sender<ProtocolEvent>, method(method, SegmentType.COMMAND); } - private ByteBuffer copy(ByteBuffer src) - { - ByteBuffer buf = ByteBuffer.allocate(src.remaining()); - buf.put(src); - buf.flip(); - return buf; - } - private void method(Method method, SegmentType type) { - BBEncoder enc = encoder.get(); + BBEncoder enc = _encoder.get(); enc.init(); enc.writeUint16(method.getEncodedType()); if (type == SegmentType.COMMAND) @@ -227,8 +211,7 @@ public final class Disassembler implements Sender<ProtocolEvent>, if (payload) { ByteBuffer body = method.getBody(); - fragment(body == null ? LAST_SEG : 0x0, SegmentType.HEADER, - method, headerSeg); + fragment(body == null ? LAST_SEG : 0x0, SegmentType.HEADER, method, headerSeg); if (body != null) { fragment(LAST_SEG, SegmentType.BODY, method, body); @@ -240,7 +223,7 @@ public final class Disassembler implements Sender<ProtocolEvent>, public void error(Void v, ProtocolError error) { - throw new IllegalArgumentException("" + error); + throw new IllegalArgumentException(String.valueOf(error)); } public void setIdleTimeout(int i) |