summaryrefslogtreecommitdiff
path: root/qpid/java/common/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/common/src/main')
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java59
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java38
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java27
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolHeaderException.java11
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java30
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java67
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/FrameSizeObserver.java26
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java25
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java18
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java39
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/Frame.java4
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java26
12 files changed, 219 insertions, 151 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java
deleted file mode 100644
index 220e33724a..0000000000
--- a/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.codec;
-
-import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
-
-/**
- * AMQCodecFactory is a Mina codec factory. It supplies the encoders and decoders need to read and write the bytes to
- * the wire.
- */
-public class AMQCodecFactory
-{
-
- /** Holds the protocol decoder. */
- private final AMQDecoder _frameDecoder;
-
- /**
- * Creates a new codec factory, specifiying whether it is expected that the first frame of data should be an
- * initiation. This is the case for the broker, which always expects to received the protocol initiation on a newly
- * connected client.
- *
- * @param expectProtocolInitiation <tt>true</tt> if the first frame received is going to be a protocol initiation
- * frame, <tt>false</tt> if it is going to be a standard AMQ data block.
- * @param session protocol session (connection)
- */
- public AMQCodecFactory(boolean expectProtocolInitiation, AMQVersionAwareProtocolSession session)
- {
- _frameDecoder = new AMQDecoder(expectProtocolInitiation, session);
- }
-
-
- /**
- * Gets the AMQP decoder.
- *
- * @return The AMQP decoder.
- */
- public AMQDecoder getDecoder()
- {
- return _frameDecoder;
- }
-}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java b/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
index 3ccd7e2031..ebecb7b483 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
@@ -20,6 +20,16 @@
*/
package org.apache.qpid.codec;
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ListIterator;
+
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQDataBlockDecoder;
import org.apache.qpid.framing.AMQFrameDecodingException;
@@ -31,16 +41,6 @@ import org.apache.qpid.framing.EncodingUtils;
import org.apache.qpid.framing.ProtocolInitiation;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
-import java.io.ByteArrayInputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.ListIterator;
-
/**
* AMQDecoder delegates the decoding of AMQP either to a data block decoder, or in the case of new connections, to a
* protocol initiation decoder. It is a cumulative decoder, which means that it can accumulate data to decode in the
@@ -66,6 +66,8 @@ public class AMQDecoder
private AMQMethodBodyFactory _bodyFactory;
+ private boolean _firstRead = true;
+
private List<ByteArrayInputStream> _remainingBufs = new ArrayList<ByteArrayInputStream>();
/**
@@ -94,6 +96,11 @@ public class AMQDecoder
_expectProtocolInitiation = expectProtocolInitiation;
}
+ public void setMaxFrameSize(final long frameMax)
+ {
+ _dataBlockDecoder.setMaxFrameSize(frameMax);
+ }
+
private class RemainingByteArrayInputStream extends InputStream
{
private int _currentListPos;
@@ -234,6 +241,17 @@ public class AMQDecoder
msg = new ByteArrayDataInput(buf.array(),buf.arrayOffset()+buf.position(), buf.remaining());
}
+ // If this is the first read then we may be getting a protocol initiation back if we tried to negotiate
+ // an unsupported version
+ if(_firstRead && buf.hasRemaining())
+ {
+ _firstRead = false;
+ if(!_expectProtocolInitiation && buf.get(buf.position()) > 8)
+ {
+ _expectProtocolInitiation = true;
+ }
+ }
+
boolean enoughData = true;
while (enoughData)
{
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
index 9d5e654ad0..d00ddf4074 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
@@ -20,12 +20,13 @@
*/
package org.apache.qpid.framing;
+import java.io.IOException;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.codec.MarkableDataInput;
-
-import java.io.IOException;
+import org.apache.qpid.protocol.AMQConstant;
public class AMQDataBlockDecoder
{
@@ -40,6 +41,7 @@ public class AMQDataBlockDecoder
}
private Logger _logger = LoggerFactory.getLogger(AMQDataBlockDecoder.class);
+ private long _maxFrameSize = AMQConstant.FRAME_MIN_SIZE.getCode();
public AMQDataBlockDecoder()
{ }
@@ -59,14 +61,17 @@ public class AMQDataBlockDecoder
// Get an unsigned int, lifted from MINA ByteBuffer getUnsignedInt()
final long bodySize = in.readInt() & 0xffffffffL;
-
+ if(bodySize > _maxFrameSize)
+ {
+ throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR, "Incoming frame size of "+bodySize+" is larger than negotiated maximum of " + _maxFrameSize);
+ }
in.reset();
return (remainingAfterAttributes >= bodySize);
}
- public AMQFrame createAndPopulateFrame(AMQMethodBodyFactory methodBodyFactory, MarkableDataInput in)
+ public AMQFrame createAndPopulateFrame(BodyFactory methodBodyFactory, MarkableDataInput in)
throws AMQFrameDecodingException, AMQProtocolVersionException, IOException
{
final byte type = in.readByte();
@@ -83,7 +88,7 @@ public class AMQDataBlockDecoder
if (bodyFactory == null)
{
- throw new AMQFrameDecodingException(null, "Unsupported frame type: " + type, null);
+ throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR, "Unsupported frame type: " + type);
}
final int channel = in.readUnsignedShort();
@@ -92,8 +97,8 @@ public class AMQDataBlockDecoder
// bodySize can be zero
if ((channel < 0) || (bodySize < 0))
{
- throw new AMQFrameDecodingException(null, "Undecodable frame: type = " + type + " channel = " + channel
- + " bodySize = " + bodySize, null);
+ throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR, "Undecodable frame: type = " + type + " channel = " + channel
+ + " bodySize = " + bodySize);
}
AMQFrame frame = new AMQFrame(in, channel, bodySize, bodyFactory);
@@ -101,11 +106,15 @@ public class AMQDataBlockDecoder
byte marker = in.readByte();
if ((marker & 0xFF) != 0xCE)
{
- throw new AMQFrameDecodingException(null, "End of frame marker not found. Read " + marker + " length=" + bodySize
- + " type=" + type, null);
+ throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR, "End of frame marker not found. Read " + marker + " length=" + bodySize
+ + " type=" + type);
}
return frame;
}
+ public void setMaxFrameSize(final long maxFrameSize)
+ {
+ _maxFrameSize = maxFrameSize;
+ }
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolHeaderException.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolHeaderException.java
index b0c92d9aab..b55a48067d 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolHeaderException.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolHeaderException.java
@@ -20,17 +20,12 @@
*/
package org.apache.qpid.framing;
-import org.apache.qpid.AMQException;
+import org.apache.qpid.transport.TransportException;
-/**
- * AMQProtocolHeaderException indicates a format error in an AMQP frame header.
- * <p>
- * TODO Not an AMQP exception as no status code.
- */
-public class AMQProtocolHeaderException extends AMQException
+public class AMQProtocolHeaderException extends TransportException
{
public AMQProtocolHeaderException(String message, Throwable cause)
{
- super(null, message, cause);
+ super(message, cause);
}
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
index d48cd1754c..1866e1fd15 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
@@ -20,20 +20,21 @@
*/
package org.apache.qpid.transport;
-import org.apache.qpid.common.QpidProperties;
-import org.apache.qpid.configuration.ClientProperties;
-import org.apache.qpid.properties.ConnectionStartProperties;
-import org.apache.qpid.transport.util.Logger;
-
import static org.apache.qpid.transport.Connection.State.OPEN;
import static org.apache.qpid.transport.Connection.State.RESUMING;
-import javax.security.sasl.SaslClient;
-import javax.security.sasl.SaslException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+
+import org.apache.qpid.common.QpidProperties;
+import org.apache.qpid.configuration.ClientProperties;
+import org.apache.qpid.properties.ConnectionStartProperties;
+import org.apache.qpid.transport.util.Logger;
+
/**
* ClientDelegate
@@ -138,13 +139,24 @@ public class ClientDelegate extends ConnectionDelegate
int actualHeartbeatInterval = calculateHeartbeatInterval(heartbeatInterval,
tune.getHeartbeatMin(),
tune.getHeartbeatMax());
+ int maxFrameSize = tune.getMaxFrameSize();
+ int settingsMaxFrameSize = conn.getConnectionSettings().getMaxFrameSize();
+ if(maxFrameSize == 0 && settingsMaxFrameSize != 0 && settingsMaxFrameSize < 0xffff)
+ {
+ maxFrameSize = Math.max(Constant.MIN_MAX_FRAME_SIZE, settingsMaxFrameSize);
+ }
+ else if(maxFrameSize != 0 && settingsMaxFrameSize != 0)
+ {
+ maxFrameSize = Math.max(Constant.MIN_MAX_FRAME_SIZE, Math.min(maxFrameSize, settingsMaxFrameSize));
+ }
conn.connectionTuneOk(tune.getChannelMax(),
- tune.getMaxFrameSize(),
+ maxFrameSize,
actualHeartbeatInterval);
int idleTimeout = (int)(actualHeartbeatInterval * 1000 * heartbeatTimeoutFactor);
conn.getNetworkConnection().setMaxReadIdle((int)(actualHeartbeatInterval*heartbeatTimeoutFactor));
conn.getNetworkConnection().setMaxWriteIdle(actualHeartbeatInterval);
+ conn.setMaxFrameSize(maxFrameSize == 0 ? 0xffff : maxFrameSize);
conn.setIdleTimeout(idleTimeout);
int channelMax = tune.getChannelMax();
@@ -183,7 +195,7 @@ public class ClientDelegate extends ConnectionDelegate
/**
* Currently the spec specified the min and max for heartbeat using secs
*/
- private int calculateHeartbeatInterval(int heartbeat,int min, int max)
+ int calculateHeartbeatInterval(int heartbeat,int min, int max)
{
int i = heartbeat;
if (i == 0)
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
index 7c604e8e8e..44cb30e735 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
@@ -20,23 +20,12 @@
*/
package org.apache.qpid.transport;
-import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.transport.network.*;
-import org.apache.qpid.transport.network.security.SecurityLayer;
-import org.apache.qpid.transport.network.security.SecurityLayerFactory;
-import org.apache.qpid.transport.util.Logger;
-import org.apache.qpid.transport.util.Waiter;
-import org.apache.qpid.util.Strings;
-
import static org.apache.qpid.transport.Connection.State.CLOSED;
import static org.apache.qpid.transport.Connection.State.CLOSING;
import static org.apache.qpid.transport.Connection.State.NEW;
import static org.apache.qpid.transport.Connection.State.OPEN;
import static org.apache.qpid.transport.Connection.State.OPENING;
-import javax.security.sasl.SaslClient;
-import javax.security.sasl.SaslServer;
-
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -48,6 +37,23 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslServer;
+
+import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.transport.network.Assembler;
+import org.apache.qpid.transport.network.Disassembler;
+import org.apache.qpid.transport.network.InputHandler;
+import org.apache.qpid.transport.network.NetworkConnection;
+import org.apache.qpid.transport.network.OutgoingNetworkTransport;
+import org.apache.qpid.transport.network.Transport;
+import org.apache.qpid.transport.network.TransportActivity;
+import org.apache.qpid.transport.network.security.SecurityLayer;
+import org.apache.qpid.transport.network.security.SecurityLayerFactory;
+import org.apache.qpid.transport.util.Logger;
+import org.apache.qpid.transport.util.Waiter;
+import org.apache.qpid.util.Strings;
+
/**
* Connection
@@ -71,7 +77,7 @@ public class Connection extends ConnectionInvoker
private long _lastSendTime;
private long _lastReadTime;
private NetworkConnection _networkConnection;
-
+ private FrameSizeObserver _frameSizeObserver;
public enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD, RESUMING }
@@ -224,7 +230,9 @@ public class Connection extends ConnectionInvoker
securityLayer = SecurityLayerFactory.newInstance(getConnectionSettings());
OutgoingNetworkTransport transport = Transport.getOutgoingTransportInstance(ProtocolVersion.v0_10);
- Receiver<ByteBuffer> secureReceiver = securityLayer.receiver(new InputHandler(new Assembler(this)));
+ final InputHandler inputHandler = new InputHandler(new Assembler(this));
+ addFrameSizeObserver(inputHandler);
+ Receiver<ByteBuffer> secureReceiver = securityLayer.receiver(inputHandler);
if(secureReceiver instanceof ConnectionListener)
{
addConnectionListener((ConnectionListener)secureReceiver);
@@ -241,7 +249,9 @@ public class Connection extends ConnectionInvoker
{
addConnectionListener((ConnectionListener)secureSender);
}
- sender = new Disassembler(secureSender, settings.getMaxFrameSize());
+ Disassembler disassembler = new Disassembler(secureSender, Constant.MIN_MAX_FRAME_SIZE);
+ sender = disassembler;
+ addFrameSizeObserver(disassembler);
send(new ProtocolHeader(1, 0, 10));
@@ -809,4 +819,33 @@ public class Connection extends ConnectionInvoker
{
return _networkConnection;
}
+
+ public void setMaxFrameSize(final int maxFrameSize)
+ {
+ if(_frameSizeObserver != null)
+ {
+ _frameSizeObserver.setMaxFrameSize(maxFrameSize);
+ }
+ }
+
+ public void addFrameSizeObserver(final FrameSizeObserver frameSizeObserver)
+ {
+ if(_frameSizeObserver == null)
+ {
+ _frameSizeObserver = frameSizeObserver;
+ }
+ else
+ {
+ final FrameSizeObserver currentObserver = _frameSizeObserver;
+ _frameSizeObserver = new FrameSizeObserver()
+ {
+ @Override
+ public void setMaxFrameSize(final int frameSize)
+ {
+ currentObserver.setMaxFrameSize(frameSize);
+ frameSizeObserver.setMaxFrameSize(frameSize);
+ }
+ };
+ }
+ }
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/FrameSizeObserver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/FrameSizeObserver.java
new file mode 100644
index 0000000000..94d0080fbb
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/FrameSizeObserver.java
@@ -0,0 +1,26 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport;
+
+public interface FrameSizeObserver
+{
+ void setMaxFrameSize(int frameSize);
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
index 1e0d5b9698..82a677b8f7 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
@@ -20,18 +20,19 @@
*/
package org.apache.qpid.transport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import static org.apache.qpid.transport.Connection.State.OPEN;
-import javax.security.sasl.Sasl;
-import javax.security.sasl.SaslException;
-import javax.security.sasl.SaslServer;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* ServerDelegate
*/
@@ -136,12 +137,14 @@ public class ServerDelegate extends ConnectionDelegate
protected void tuneAuthorizedConnection(final Connection conn)
{
- conn.connectionTune
- (getChannelMax(),
- org.apache.qpid.transport.network.ConnectionBinding.MAX_FRAME_SIZE,
- 0, getHeartbeatMax());
+ conn.connectionTune(getChannelMax(), getFrameMax(), 0, getHeartbeatMax());
}
-
+
+ protected int getFrameMax()
+ {
+ return org.apache.qpid.transport.network.ConnectionBinding.MAX_FRAME_SIZE;
+ }
+
protected void secure(final Connection conn, final byte[] response)
{
final SaslServer ss = conn.getSaslServer();
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java
index 5a5de597c2..26e8f1850b 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java
@@ -20,17 +20,18 @@
*/
package org.apache.qpid.transport.network;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.transport.Binding;
import org.apache.qpid.transport.Connection;
import org.apache.qpid.transport.ConnectionDelegate;
import org.apache.qpid.transport.ConnectionListener;
+import org.apache.qpid.transport.Constant;
import org.apache.qpid.transport.Receiver;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.network.security.sasl.SASLReceiver;
import org.apache.qpid.transport.network.security.sasl.SASLSender;
-import java.nio.ByteBuffer;
-
/**
* ConnectionBinding
*
@@ -80,23 +81,26 @@ public abstract class ConnectionBinding
}
// XXX: hardcoded max-frame
- Disassembler dis = new Disassembler(sender, MAX_FRAME_SIZE);
+ Disassembler dis = new Disassembler(sender, Constant.MIN_MAX_FRAME_SIZE);
+ conn.addFrameSizeObserver(dis);
conn.setSender(dis);
return conn;
}
public Receiver<ByteBuffer> receiver(Connection conn)
{
- if (conn.getConnectionSettings() != null &&
+ final InputHandler inputHandler = new InputHandler(new Assembler(conn));
+ conn.addFrameSizeObserver(inputHandler);
+ if (conn.getConnectionSettings() != null &&
conn.getConnectionSettings().isUseSASLEncryption())
{
- SASLReceiver receiver = new SASLReceiver(new InputHandler(new Assembler(conn)));
- conn.addConnectionListener((ConnectionListener)receiver);
+ SASLReceiver receiver = new SASLReceiver(inputHandler);
+ conn.addConnectionListener(receiver);
return receiver;
}
else
{
- return new InputHandler(new Assembler(conn));
+ return inputHandler;
}
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
index fe437ecf93..a804cb2f9d 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
@@ -20,6 +20,17 @@
*/
package org.apache.qpid.transport.network;
+import static java.lang.Math.min;
+import static org.apache.qpid.transport.network.Frame.FIRST_FRAME;
+import static org.apache.qpid.transport.network.Frame.FIRST_SEG;
+import static org.apache.qpid.transport.network.Frame.HEADER_SIZE;
+import static org.apache.qpid.transport.network.Frame.LAST_FRAME;
+import static org.apache.qpid.transport.network.Frame.LAST_SEG;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import org.apache.qpid.transport.FrameSizeObserver;
import org.apache.qpid.transport.Header;
import org.apache.qpid.transport.Method;
import org.apache.qpid.transport.ProtocolDelegate;
@@ -31,24 +42,13 @@ import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.Struct;
import org.apache.qpid.transport.codec.BBEncoder;
-import static org.apache.qpid.transport.network.Frame.FIRST_FRAME;
-import static org.apache.qpid.transport.network.Frame.FIRST_SEG;
-import static org.apache.qpid.transport.network.Frame.HEADER_SIZE;
-import static org.apache.qpid.transport.network.Frame.LAST_FRAME;
-import static org.apache.qpid.transport.network.Frame.LAST_SEG;
-
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-
-import static java.lang.Math.min;
-
/**
* Disassembler
*/
-public final class Disassembler implements Sender<ProtocolEvent>, ProtocolDelegate<Void>
+public final class Disassembler implements Sender<ProtocolEvent>, ProtocolDelegate<Void>, FrameSizeObserver
{
private final Sender<ByteBuffer> sender;
- private final int maxPayload;
+ private int maxPayload;
private final Object sendlock = new Object();
private final static ThreadLocal<BBEncoder> _encoder = new ThreadLocal<BBEncoder>()
{
@@ -60,11 +60,11 @@ public final class Disassembler implements Sender<ProtocolEvent>, ProtocolDelega
public Disassembler(Sender<ByteBuffer> sender, int maxFrame)
{
+ this.sender = sender;
if (maxFrame <= HEADER_SIZE || maxFrame >= 64*1024)
{
throw new IllegalArgumentException("maxFrame must be > HEADER_SIZE and < 64K: " + maxFrame);
}
- this.sender = sender;
this.maxPayload = maxFrame - HEADER_SIZE;
}
@@ -255,4 +255,15 @@ public final class Disassembler implements Sender<ProtocolEvent>, ProtocolDelega
{
sender.setIdleTimeout(i);
}
+
+ @Override
+ public void setMaxFrameSize(final int maxFrame)
+ {
+ if (maxFrame <= HEADER_SIZE || maxFrame >= 64*1024)
+ {
+ throw new IllegalArgumentException("maxFrame must be > HEADER_SIZE and < 64K: " + maxFrame);
+ }
+ this.maxPayload = maxFrame - HEADER_SIZE;
+
+ }
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Frame.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Frame.java
index 9416c4c9fa..e810d9e8ae 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Frame.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Frame.java
@@ -20,12 +20,12 @@
*/
package org.apache.qpid.transport.network;
-import org.apache.qpid.transport.SegmentType;
-
import static org.apache.qpid.transport.util.Functions.str;
import java.nio.ByteBuffer;
+import org.apache.qpid.transport.SegmentType;
+
/**
* Frame
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java
index 86e05db818..758c2e1eda 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java
@@ -20,11 +20,6 @@
*/
package org.apache.qpid.transport.network;
-import org.apache.qpid.transport.ProtocolError;
-import org.apache.qpid.transport.ProtocolHeader;
-import org.apache.qpid.transport.Receiver;
-import org.apache.qpid.transport.SegmentType;
-
import static org.apache.qpid.transport.network.InputHandler.State.ERROR;
import static org.apache.qpid.transport.network.InputHandler.State.FRAME_BODY;
import static org.apache.qpid.transport.network.InputHandler.State.FRAME_HDR;
@@ -34,6 +29,13 @@ import static org.apache.qpid.transport.util.Functions.str;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
+import org.apache.qpid.transport.Constant;
+import org.apache.qpid.transport.FrameSizeObserver;
+import org.apache.qpid.transport.ProtocolError;
+import org.apache.qpid.transport.ProtocolHeader;
+import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.SegmentType;
+
/**
* InputHandler
@@ -41,15 +43,17 @@ import java.nio.ByteOrder;
* @author Rafael H. Schloming
*/
-public class InputHandler implements Receiver<ByteBuffer>
+public class InputHandler implements Receiver<ByteBuffer>, FrameSizeObserver
{
+ private int _maxFrameSize = Constant.MIN_MAX_FRAME_SIZE;
+
public enum State
{
PROTO_HDR,
FRAME_HDR,
FRAME_BODY,
- ERROR;
+ ERROR
}
private final Receiver<NetworkEvent> receiver;
@@ -83,6 +87,11 @@ public class InputHandler implements Receiver<ByteBuffer>
this(receiver, PROTO_HDR);
}
+ public void setMaxFrameSize(final int maxFrameSize)
+ {
+ _maxFrameSize = maxFrameSize;
+ }
+
private void error(String fmt, Object ... args)
{
receiver.received(new ProtocolError(Frame.L1, fmt, args));
@@ -158,7 +167,8 @@ public class InputHandler implements Receiver<ByteBuffer>
type = SegmentType.get(input.get(pos + 1));
int size = (0xFFFF & input.getShort(pos + 2));
size -= Frame.HEADER_SIZE;
- if (size < 0 || size > (64*1024 - 12))
+ _maxFrameSize = 64 * 1024;
+ if (size < 0 || size > (_maxFrameSize - 12))
{
error("bad frame size: %d", size);
return ERROR;