diff options
Diffstat (limited to 'qpid/java/common/src/main')
12 files changed, 219 insertions, 151 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java deleted file mode 100644 index 220e33724a..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.codec; - -import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; - -/** - * AMQCodecFactory is a Mina codec factory. It supplies the encoders and decoders need to read and write the bytes to - * the wire. - */ -public class AMQCodecFactory -{ - - /** Holds the protocol decoder. */ - private final AMQDecoder _frameDecoder; - - /** - * Creates a new codec factory, specifiying whether it is expected that the first frame of data should be an - * initiation. This is the case for the broker, which always expects to received the protocol initiation on a newly - * connected client. - * - * @param expectProtocolInitiation <tt>true</tt> if the first frame received is going to be a protocol initiation - * frame, <tt>false</tt> if it is going to be a standard AMQ data block. - * @param session protocol session (connection) - */ - public AMQCodecFactory(boolean expectProtocolInitiation, AMQVersionAwareProtocolSession session) - { - _frameDecoder = new AMQDecoder(expectProtocolInitiation, session); - } - - - /** - * Gets the AMQP decoder. - * - * @return The AMQP decoder. - */ - public AMQDecoder getDecoder() - { - return _frameDecoder; - } -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java b/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java index 3ccd7e2031..ebecb7b483 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java @@ -20,6 +20,16 @@ */ package org.apache.qpid.codec; +import java.io.ByteArrayInputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.ListIterator; + import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.AMQDataBlockDecoder; import org.apache.qpid.framing.AMQFrameDecodingException; @@ -31,16 +41,6 @@ import org.apache.qpid.framing.EncodingUtils; import org.apache.qpid.framing.ProtocolInitiation; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; -import java.io.ByteArrayInputStream; -import java.io.DataInput; -import java.io.DataInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; -import java.util.ListIterator; - /** * AMQDecoder delegates the decoding of AMQP either to a data block decoder, or in the case of new connections, to a * protocol initiation decoder. It is a cumulative decoder, which means that it can accumulate data to decode in the @@ -66,6 +66,8 @@ public class AMQDecoder private AMQMethodBodyFactory _bodyFactory; + private boolean _firstRead = true; + private List<ByteArrayInputStream> _remainingBufs = new ArrayList<ByteArrayInputStream>(); /** @@ -94,6 +96,11 @@ public class AMQDecoder _expectProtocolInitiation = expectProtocolInitiation; } + public void setMaxFrameSize(final long frameMax) + { + _dataBlockDecoder.setMaxFrameSize(frameMax); + } + private class RemainingByteArrayInputStream extends InputStream { private int _currentListPos; @@ -234,6 +241,17 @@ public class AMQDecoder msg = new ByteArrayDataInput(buf.array(),buf.arrayOffset()+buf.position(), buf.remaining()); } + // If this is the first read then we may be getting a protocol initiation back if we tried to negotiate + // an unsupported version + if(_firstRead && buf.hasRemaining()) + { + _firstRead = false; + if(!_expectProtocolInitiation && buf.get(buf.position()) > 8) + { + _expectProtocolInitiation = true; + } + } + boolean enoughData = true; while (enoughData) { diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java index 9d5e654ad0..d00ddf4074 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java @@ -20,12 +20,13 @@ */ package org.apache.qpid.framing; +import java.io.IOException; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpid.codec.MarkableDataInput; - -import java.io.IOException; +import org.apache.qpid.protocol.AMQConstant; public class AMQDataBlockDecoder { @@ -40,6 +41,7 @@ public class AMQDataBlockDecoder } private Logger _logger = LoggerFactory.getLogger(AMQDataBlockDecoder.class); + private long _maxFrameSize = AMQConstant.FRAME_MIN_SIZE.getCode(); public AMQDataBlockDecoder() { } @@ -59,14 +61,17 @@ public class AMQDataBlockDecoder // Get an unsigned int, lifted from MINA ByteBuffer getUnsignedInt() final long bodySize = in.readInt() & 0xffffffffL; - + if(bodySize > _maxFrameSize) + { + throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR, "Incoming frame size of "+bodySize+" is larger than negotiated maximum of " + _maxFrameSize); + } in.reset(); return (remainingAfterAttributes >= bodySize); } - public AMQFrame createAndPopulateFrame(AMQMethodBodyFactory methodBodyFactory, MarkableDataInput in) + public AMQFrame createAndPopulateFrame(BodyFactory methodBodyFactory, MarkableDataInput in) throws AMQFrameDecodingException, AMQProtocolVersionException, IOException { final byte type = in.readByte(); @@ -83,7 +88,7 @@ public class AMQDataBlockDecoder if (bodyFactory == null) { - throw new AMQFrameDecodingException(null, "Unsupported frame type: " + type, null); + throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR, "Unsupported frame type: " + type); } final int channel = in.readUnsignedShort(); @@ -92,8 +97,8 @@ public class AMQDataBlockDecoder // bodySize can be zero if ((channel < 0) || (bodySize < 0)) { - throw new AMQFrameDecodingException(null, "Undecodable frame: type = " + type + " channel = " + channel - + " bodySize = " + bodySize, null); + throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR, "Undecodable frame: type = " + type + " channel = " + channel + + " bodySize = " + bodySize); } AMQFrame frame = new AMQFrame(in, channel, bodySize, bodyFactory); @@ -101,11 +106,15 @@ public class AMQDataBlockDecoder byte marker = in.readByte(); if ((marker & 0xFF) != 0xCE) { - throw new AMQFrameDecodingException(null, "End of frame marker not found. Read " + marker + " length=" + bodySize - + " type=" + type, null); + throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR, "End of frame marker not found. Read " + marker + " length=" + bodySize + + " type=" + type); } return frame; } + public void setMaxFrameSize(final long maxFrameSize) + { + _maxFrameSize = maxFrameSize; + } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolHeaderException.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolHeaderException.java index b0c92d9aab..b55a48067d 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolHeaderException.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolHeaderException.java @@ -20,17 +20,12 @@ */ package org.apache.qpid.framing; -import org.apache.qpid.AMQException; +import org.apache.qpid.transport.TransportException; -/** - * AMQProtocolHeaderException indicates a format error in an AMQP frame header. - * <p> - * TODO Not an AMQP exception as no status code. - */ -public class AMQProtocolHeaderException extends AMQException +public class AMQProtocolHeaderException extends TransportException { public AMQProtocolHeaderException(String message, Throwable cause) { - super(null, message, cause); + super(message, cause); } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java index d48cd1754c..1866e1fd15 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java @@ -20,20 +20,21 @@ */ package org.apache.qpid.transport; -import org.apache.qpid.common.QpidProperties; -import org.apache.qpid.configuration.ClientProperties; -import org.apache.qpid.properties.ConnectionStartProperties; -import org.apache.qpid.transport.util.Logger; - import static org.apache.qpid.transport.Connection.State.OPEN; import static org.apache.qpid.transport.Connection.State.RESUMING; -import javax.security.sasl.SaslClient; -import javax.security.sasl.SaslException; import java.util.HashMap; import java.util.List; import java.util.Map; +import javax.security.sasl.SaslClient; +import javax.security.sasl.SaslException; + +import org.apache.qpid.common.QpidProperties; +import org.apache.qpid.configuration.ClientProperties; +import org.apache.qpid.properties.ConnectionStartProperties; +import org.apache.qpid.transport.util.Logger; + /** * ClientDelegate @@ -138,13 +139,24 @@ public class ClientDelegate extends ConnectionDelegate int actualHeartbeatInterval = calculateHeartbeatInterval(heartbeatInterval, tune.getHeartbeatMin(), tune.getHeartbeatMax()); + int maxFrameSize = tune.getMaxFrameSize(); + int settingsMaxFrameSize = conn.getConnectionSettings().getMaxFrameSize(); + if(maxFrameSize == 0 && settingsMaxFrameSize != 0 && settingsMaxFrameSize < 0xffff) + { + maxFrameSize = Math.max(Constant.MIN_MAX_FRAME_SIZE, settingsMaxFrameSize); + } + else if(maxFrameSize != 0 && settingsMaxFrameSize != 0) + { + maxFrameSize = Math.max(Constant.MIN_MAX_FRAME_SIZE, Math.min(maxFrameSize, settingsMaxFrameSize)); + } conn.connectionTuneOk(tune.getChannelMax(), - tune.getMaxFrameSize(), + maxFrameSize, actualHeartbeatInterval); int idleTimeout = (int)(actualHeartbeatInterval * 1000 * heartbeatTimeoutFactor); conn.getNetworkConnection().setMaxReadIdle((int)(actualHeartbeatInterval*heartbeatTimeoutFactor)); conn.getNetworkConnection().setMaxWriteIdle(actualHeartbeatInterval); + conn.setMaxFrameSize(maxFrameSize == 0 ? 0xffff : maxFrameSize); conn.setIdleTimeout(idleTimeout); int channelMax = tune.getChannelMax(); @@ -183,7 +195,7 @@ public class ClientDelegate extends ConnectionDelegate /** * Currently the spec specified the min and max for heartbeat using secs */ - private int calculateHeartbeatInterval(int heartbeat,int min, int max) + int calculateHeartbeatInterval(int heartbeat,int min, int max) { int i = heartbeat; if (i == 0) diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java index 7c604e8e8e..44cb30e735 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java @@ -20,23 +20,12 @@ */ package org.apache.qpid.transport; -import org.apache.qpid.framing.ProtocolVersion; -import org.apache.qpid.transport.network.*; -import org.apache.qpid.transport.network.security.SecurityLayer; -import org.apache.qpid.transport.network.security.SecurityLayerFactory; -import org.apache.qpid.transport.util.Logger; -import org.apache.qpid.transport.util.Waiter; -import org.apache.qpid.util.Strings; - import static org.apache.qpid.transport.Connection.State.CLOSED; import static org.apache.qpid.transport.Connection.State.CLOSING; import static org.apache.qpid.transport.Connection.State.NEW; import static org.apache.qpid.transport.Connection.State.OPEN; import static org.apache.qpid.transport.Connection.State.OPENING; -import javax.security.sasl.SaslClient; -import javax.security.sasl.SaslServer; - import java.net.SocketAddress; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -48,6 +37,23 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; +import javax.security.sasl.SaslClient; +import javax.security.sasl.SaslServer; + +import org.apache.qpid.framing.ProtocolVersion; +import org.apache.qpid.transport.network.Assembler; +import org.apache.qpid.transport.network.Disassembler; +import org.apache.qpid.transport.network.InputHandler; +import org.apache.qpid.transport.network.NetworkConnection; +import org.apache.qpid.transport.network.OutgoingNetworkTransport; +import org.apache.qpid.transport.network.Transport; +import org.apache.qpid.transport.network.TransportActivity; +import org.apache.qpid.transport.network.security.SecurityLayer; +import org.apache.qpid.transport.network.security.SecurityLayerFactory; +import org.apache.qpid.transport.util.Logger; +import org.apache.qpid.transport.util.Waiter; +import org.apache.qpid.util.Strings; + /** * Connection @@ -71,7 +77,7 @@ public class Connection extends ConnectionInvoker private long _lastSendTime; private long _lastReadTime; private NetworkConnection _networkConnection; - + private FrameSizeObserver _frameSizeObserver; public enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD, RESUMING } @@ -224,7 +230,9 @@ public class Connection extends ConnectionInvoker securityLayer = SecurityLayerFactory.newInstance(getConnectionSettings()); OutgoingNetworkTransport transport = Transport.getOutgoingTransportInstance(ProtocolVersion.v0_10); - Receiver<ByteBuffer> secureReceiver = securityLayer.receiver(new InputHandler(new Assembler(this))); + final InputHandler inputHandler = new InputHandler(new Assembler(this)); + addFrameSizeObserver(inputHandler); + Receiver<ByteBuffer> secureReceiver = securityLayer.receiver(inputHandler); if(secureReceiver instanceof ConnectionListener) { addConnectionListener((ConnectionListener)secureReceiver); @@ -241,7 +249,9 @@ public class Connection extends ConnectionInvoker { addConnectionListener((ConnectionListener)secureSender); } - sender = new Disassembler(secureSender, settings.getMaxFrameSize()); + Disassembler disassembler = new Disassembler(secureSender, Constant.MIN_MAX_FRAME_SIZE); + sender = disassembler; + addFrameSizeObserver(disassembler); send(new ProtocolHeader(1, 0, 10)); @@ -809,4 +819,33 @@ public class Connection extends ConnectionInvoker { return _networkConnection; } + + public void setMaxFrameSize(final int maxFrameSize) + { + if(_frameSizeObserver != null) + { + _frameSizeObserver.setMaxFrameSize(maxFrameSize); + } + } + + public void addFrameSizeObserver(final FrameSizeObserver frameSizeObserver) + { + if(_frameSizeObserver == null) + { + _frameSizeObserver = frameSizeObserver; + } + else + { + final FrameSizeObserver currentObserver = _frameSizeObserver; + _frameSizeObserver = new FrameSizeObserver() + { + @Override + public void setMaxFrameSize(final int frameSize) + { + currentObserver.setMaxFrameSize(frameSize); + frameSizeObserver.setMaxFrameSize(frameSize); + } + }; + } + } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/FrameSizeObserver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/FrameSizeObserver.java new file mode 100644 index 0000000000..94d0080fbb --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/FrameSizeObserver.java @@ -0,0 +1,26 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport; + +public interface FrameSizeObserver +{ + void setMaxFrameSize(int frameSize); +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java index 1e0d5b9698..82a677b8f7 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java @@ -20,18 +20,19 @@ */ package org.apache.qpid.transport; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import static org.apache.qpid.transport.Connection.State.OPEN; -import javax.security.sasl.Sasl; -import javax.security.sasl.SaslException; -import javax.security.sasl.SaslServer; import java.util.Collections; import java.util.List; import java.util.Map; +import javax.security.sasl.Sasl; +import javax.security.sasl.SaslException; +import javax.security.sasl.SaslServer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * ServerDelegate */ @@ -136,12 +137,14 @@ public class ServerDelegate extends ConnectionDelegate protected void tuneAuthorizedConnection(final Connection conn) { - conn.connectionTune - (getChannelMax(), - org.apache.qpid.transport.network.ConnectionBinding.MAX_FRAME_SIZE, - 0, getHeartbeatMax()); + conn.connectionTune(getChannelMax(), getFrameMax(), 0, getHeartbeatMax()); } - + + protected int getFrameMax() + { + return org.apache.qpid.transport.network.ConnectionBinding.MAX_FRAME_SIZE; + } + protected void secure(final Connection conn, final byte[] response) { final SaslServer ss = conn.getSaslServer(); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java index 5a5de597c2..26e8f1850b 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java @@ -20,17 +20,18 @@ */ package org.apache.qpid.transport.network; +import java.nio.ByteBuffer; + import org.apache.qpid.transport.Binding; import org.apache.qpid.transport.Connection; import org.apache.qpid.transport.ConnectionDelegate; import org.apache.qpid.transport.ConnectionListener; +import org.apache.qpid.transport.Constant; import org.apache.qpid.transport.Receiver; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.network.security.sasl.SASLReceiver; import org.apache.qpid.transport.network.security.sasl.SASLSender; -import java.nio.ByteBuffer; - /** * ConnectionBinding * @@ -80,23 +81,26 @@ public abstract class ConnectionBinding } // XXX: hardcoded max-frame - Disassembler dis = new Disassembler(sender, MAX_FRAME_SIZE); + Disassembler dis = new Disassembler(sender, Constant.MIN_MAX_FRAME_SIZE); + conn.addFrameSizeObserver(dis); conn.setSender(dis); return conn; } public Receiver<ByteBuffer> receiver(Connection conn) { - if (conn.getConnectionSettings() != null && + final InputHandler inputHandler = new InputHandler(new Assembler(conn)); + conn.addFrameSizeObserver(inputHandler); + if (conn.getConnectionSettings() != null && conn.getConnectionSettings().isUseSASLEncryption()) { - SASLReceiver receiver = new SASLReceiver(new InputHandler(new Assembler(conn))); - conn.addConnectionListener((ConnectionListener)receiver); + SASLReceiver receiver = new SASLReceiver(inputHandler); + conn.addConnectionListener(receiver); return receiver; } else { - return new InputHandler(new Assembler(conn)); + return inputHandler; } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java index fe437ecf93..a804cb2f9d 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java @@ -20,6 +20,17 @@ */ package org.apache.qpid.transport.network; +import static java.lang.Math.min; +import static org.apache.qpid.transport.network.Frame.FIRST_FRAME; +import static org.apache.qpid.transport.network.Frame.FIRST_SEG; +import static org.apache.qpid.transport.network.Frame.HEADER_SIZE; +import static org.apache.qpid.transport.network.Frame.LAST_FRAME; +import static org.apache.qpid.transport.network.Frame.LAST_SEG; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +import org.apache.qpid.transport.FrameSizeObserver; import org.apache.qpid.transport.Header; import org.apache.qpid.transport.Method; import org.apache.qpid.transport.ProtocolDelegate; @@ -31,24 +42,13 @@ import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.Struct; import org.apache.qpid.transport.codec.BBEncoder; -import static org.apache.qpid.transport.network.Frame.FIRST_FRAME; -import static org.apache.qpid.transport.network.Frame.FIRST_SEG; -import static org.apache.qpid.transport.network.Frame.HEADER_SIZE; -import static org.apache.qpid.transport.network.Frame.LAST_FRAME; -import static org.apache.qpid.transport.network.Frame.LAST_SEG; - -import java.nio.ByteBuffer; -import java.nio.ByteOrder; - -import static java.lang.Math.min; - /** * Disassembler */ -public final class Disassembler implements Sender<ProtocolEvent>, ProtocolDelegate<Void> +public final class Disassembler implements Sender<ProtocolEvent>, ProtocolDelegate<Void>, FrameSizeObserver { private final Sender<ByteBuffer> sender; - private final int maxPayload; + private int maxPayload; private final Object sendlock = new Object(); private final static ThreadLocal<BBEncoder> _encoder = new ThreadLocal<BBEncoder>() { @@ -60,11 +60,11 @@ public final class Disassembler implements Sender<ProtocolEvent>, ProtocolDelega public Disassembler(Sender<ByteBuffer> sender, int maxFrame) { + this.sender = sender; if (maxFrame <= HEADER_SIZE || maxFrame >= 64*1024) { throw new IllegalArgumentException("maxFrame must be > HEADER_SIZE and < 64K: " + maxFrame); } - this.sender = sender; this.maxPayload = maxFrame - HEADER_SIZE; } @@ -255,4 +255,15 @@ public final class Disassembler implements Sender<ProtocolEvent>, ProtocolDelega { sender.setIdleTimeout(i); } + + @Override + public void setMaxFrameSize(final int maxFrame) + { + if (maxFrame <= HEADER_SIZE || maxFrame >= 64*1024) + { + throw new IllegalArgumentException("maxFrame must be > HEADER_SIZE and < 64K: " + maxFrame); + } + this.maxPayload = maxFrame - HEADER_SIZE; + + } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Frame.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Frame.java index 9416c4c9fa..e810d9e8ae 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Frame.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Frame.java @@ -20,12 +20,12 @@ */ package org.apache.qpid.transport.network; -import org.apache.qpid.transport.SegmentType; - import static org.apache.qpid.transport.util.Functions.str; import java.nio.ByteBuffer; +import org.apache.qpid.transport.SegmentType; + /** * Frame diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java index 86e05db818..758c2e1eda 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java @@ -20,11 +20,6 @@ */ package org.apache.qpid.transport.network; -import org.apache.qpid.transport.ProtocolError; -import org.apache.qpid.transport.ProtocolHeader; -import org.apache.qpid.transport.Receiver; -import org.apache.qpid.transport.SegmentType; - import static org.apache.qpid.transport.network.InputHandler.State.ERROR; import static org.apache.qpid.transport.network.InputHandler.State.FRAME_BODY; import static org.apache.qpid.transport.network.InputHandler.State.FRAME_HDR; @@ -34,6 +29,13 @@ import static org.apache.qpid.transport.util.Functions.str; import java.nio.ByteBuffer; import java.nio.ByteOrder; +import org.apache.qpid.transport.Constant; +import org.apache.qpid.transport.FrameSizeObserver; +import org.apache.qpid.transport.ProtocolError; +import org.apache.qpid.transport.ProtocolHeader; +import org.apache.qpid.transport.Receiver; +import org.apache.qpid.transport.SegmentType; + /** * InputHandler @@ -41,15 +43,17 @@ import java.nio.ByteOrder; * @author Rafael H. Schloming */ -public class InputHandler implements Receiver<ByteBuffer> +public class InputHandler implements Receiver<ByteBuffer>, FrameSizeObserver { + private int _maxFrameSize = Constant.MIN_MAX_FRAME_SIZE; + public enum State { PROTO_HDR, FRAME_HDR, FRAME_BODY, - ERROR; + ERROR } private final Receiver<NetworkEvent> receiver; @@ -83,6 +87,11 @@ public class InputHandler implements Receiver<ByteBuffer> this(receiver, PROTO_HDR); } + public void setMaxFrameSize(final int maxFrameSize) + { + _maxFrameSize = maxFrameSize; + } + private void error(String fmt, Object ... args) { receiver.received(new ProtocolError(Frame.L1, fmt, args)); @@ -158,7 +167,8 @@ public class InputHandler implements Receiver<ByteBuffer> type = SegmentType.get(input.get(pos + 1)); int size = (0xFFFF & input.getShort(pos + 2)); size -= Frame.HEADER_SIZE; - if (size < 0 || size > (64*1024 - 12)) + _maxFrameSize = 64 * 1024; + if (size < 0 || size > (_maxFrameSize - 12)) { error("bad frame size: %d", size); return ERROR; |