summaryrefslogtreecommitdiff
path: root/qpid/java/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java')
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java349
1 files changed, 349 insertions, 0 deletions
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java
new file mode 100644
index 0000000000..322b971487
--- /dev/null
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java
@@ -0,0 +1,349 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import javax.naming.NamingException;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+
+import org.apache.qpid.codec.MarkableDataInput;
+import org.apache.qpid.framing.AMQBody;
+import org.apache.qpid.framing.AMQDataBlockDecoder;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQFrameDecodingException;
+import org.apache.qpid.framing.AMQProtocolVersionException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BodyFactory;
+import org.apache.qpid.framing.ByteArrayDataInput;
+import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.ConnectionStartOkBody;
+import org.apache.qpid.framing.ConnectionTuneOkBody;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.amqp_0_91.ConnectionStartOkBodyImpl;
+import org.apache.qpid.framing.amqp_0_91.ConnectionTuneOkBodyImpl;
+import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91;
+import org.apache.qpid.jms.BrokerDetails;
+import org.apache.qpid.server.model.AuthenticationProvider;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.Protocol;
+import org.apache.qpid.server.protocol.v0_8.ProtocolEngineCreator_0_8;
+import org.apache.qpid.server.protocol.v0_8.ProtocolEngineCreator_0_9;
+import org.apache.qpid.server.protocol.v0_8.ProtocolEngineCreator_0_9_1;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.test.utils.TestBrokerConfiguration;
+
+public class MaxFrameSizeTest extends QpidBrokerTestCase
+{
+
+ @Override
+ public void setUp() throws Exception
+ {
+ // don't call super.setup() as we want a change to set stuff up before the broker starts
+ // super.setUp();
+ }
+
+ public void testTooSmallFrameSize() throws Exception
+ {
+
+ getBrokerConfiguration().setObjectAttribute(AuthenticationProvider.class,
+ TestBrokerConfiguration.ENTRY_NAME_AUTHENTICATION_PROVIDER,
+ "secureOnlyMechanisms",
+ "[]");
+ super.setUp();
+
+ if(isBroker010())
+ {
+ Connection conn = new Connection();
+ final ConnectionSettings settings = new ConnectionSettings();
+ BrokerDetails brokerDetails = getConnectionFactory().getConnectionURL().getAllBrokerDetails().get(0);
+ settings.setHost(brokerDetails.getHost());
+ settings.setPort(brokerDetails.getPort());
+ settings.setUsername(GUEST_USERNAME);
+ settings.setPassword(GUEST_PASSWORD);
+ final ConnectionDelegate clientDelegate = new TestClientDelegate(settings, 1024);
+ conn.setConnectionDelegate(clientDelegate);
+ try
+ {
+ conn.connect(settings);
+ fail("Connection should not be possible with a frame size < " + Constant.MIN_MAX_FRAME_SIZE);
+ }
+ catch(ConnectionException e)
+ {
+ // pass
+ }
+
+ }
+ else
+ {
+ doAMQP08test(1024, new ResultEvaluator()
+ {
+
+ @Override
+ public void evaluate(final Socket socket, final List<AMQFrame> frames)
+ {
+ if(!socket.isClosed())
+ {
+ AMQFrame lastFrame = frames.get(frames.size() - 1);
+ assertTrue("Connection should not be possible with a frame size < " + Constant.MIN_MAX_FRAME_SIZE, lastFrame.getBodyFrame() instanceof ConnectionCloseBody);
+ }
+ }
+ });
+ }
+ }
+
+
+ public void testTooLargeFrameSize() throws Exception
+ {
+ getBrokerConfiguration().setObjectAttribute(AuthenticationProvider.class,
+ TestBrokerConfiguration.ENTRY_NAME_AUTHENTICATION_PROVIDER,
+ "secureOnlyMechanisms",
+ "[]");
+ setTestSystemProperty(Broker.BROKER_FRAME_SIZE, "8192");
+ super.setUp();
+ if(isBroker010())
+ {
+ Connection conn = new Connection();
+ final ConnectionSettings settings = new ConnectionSettings();
+ BrokerDetails brokerDetails = getConnectionFactory().getConnectionURL().getAllBrokerDetails().get(0);
+ settings.setHost(brokerDetails.getHost());
+ settings.setPort(brokerDetails.getPort());
+ settings.setUsername(GUEST_USERNAME);
+ settings.setPassword(GUEST_PASSWORD);
+ final ConnectionDelegate clientDelegate = new TestClientDelegate(settings, 0xffff);
+ conn.setConnectionDelegate(clientDelegate);
+ try
+ {
+ conn.connect(settings);
+ fail("Connection should not be possible with a frame size larger than the broker requested");
+ }
+ catch(ConnectionException e)
+ {
+ // pass
+ }
+
+ }
+ else
+ {
+ doAMQP08test(10000, new ResultEvaluator()
+ {
+
+ @Override
+ public void evaluate(final Socket socket, final List<AMQFrame> frames)
+ {
+ if(!socket.isClosed())
+ {
+ AMQFrame lastFrame = frames.get(frames.size() - 1);
+ assertTrue("Connection should not be possible with a frame size larger than the broker requested", lastFrame.getBodyFrame() instanceof ConnectionCloseBody);
+ }
+ }
+ });
+ }
+ }
+
+ private static interface ResultEvaluator
+ {
+ void evaluate(Socket socket, List<AMQFrame> frames);
+ }
+
+ private void doAMQP08test(int frameSize, ResultEvaluator evaluator)
+ throws NamingException, IOException, AMQFrameDecodingException, AMQProtocolVersionException
+ {
+ BrokerDetails brokerDetails = getConnectionFactory().getConnectionURL().getAllBrokerDetails().get(0);
+
+ Socket socket = new Socket(brokerDetails.getHost(), brokerDetails.getPort());
+ socket.setTcpNoDelay(true);
+ OutputStream os = socket.getOutputStream();
+
+ byte[] protocolHeader;
+ Protocol protocol = getBrokerProtocol();
+ switch(protocol)
+ {
+ case AMQP_0_8:
+ protocolHeader = (ProtocolEngineCreator_0_8.getInstance().getHeaderIdentifier());
+ break;
+ case AMQP_0_9:
+ protocolHeader = (ProtocolEngineCreator_0_9.getInstance().getHeaderIdentifier());
+ break;
+ case AMQP_0_9_1:
+ protocolHeader = (ProtocolEngineCreator_0_9_1.getInstance().getHeaderIdentifier());
+ break;
+ default:
+ throw new RuntimeException("Unexpected Protocol Version: " + protocol);
+ }
+ os.write(protocolHeader);
+ InputStream is = socket.getInputStream();
+
+ final byte[] response = new byte[2+GUEST_USERNAME.length()+GUEST_PASSWORD.length()];
+ int i = 1;
+ for(byte b : GUEST_USERNAME.getBytes(StandardCharsets.US_ASCII))
+ {
+ response[i++] = b;
+ }
+ i++;
+ for(byte b : GUEST_PASSWORD.getBytes(StandardCharsets.US_ASCII))
+ {
+ response[i++] = b;
+ }
+
+ ConnectionStartOkBody startOK = new ConnectionStartOkBodyImpl(new FieldTable(), AMQShortString.valueOf("PLAIN"), response, AMQShortString.valueOf("en_US"));
+
+ DataOutputStream dos = new DataOutputStream(os);
+ new AMQFrame(0, startOK).writePayload(dos);
+ dos.flush();
+ ConnectionTuneOkBody tuneOk = new ConnectionTuneOkBodyImpl(256, frameSize, 0);
+ new AMQFrame(0, tuneOk).writePayload(dos);
+ dos.flush();
+ socket.setSoTimeout(5000);
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ byte[] buffer = new byte[1024];
+ int size;
+ while((size = is.read(buffer)) > 0)
+ {
+ baos.write(buffer,0,size);
+ }
+
+ byte[] serverData = baos.toByteArray();
+ ByteArrayDataInput badi = new ByteArrayDataInput(serverData);
+ AMQDataBlockDecoder datablockDecoder = new AMQDataBlockDecoder();
+ final MethodRegistry_0_91 methodRegistry_0_91 = new MethodRegistry_0_91();
+ BodyFactory methodBodyFactory = new BodyFactory()
+ {
+ @Override
+ public AMQBody createBody(final MarkableDataInput in, final long bodySize)
+ throws AMQFrameDecodingException, IOException
+ {
+ return methodRegistry_0_91.convertToBody(in, bodySize);
+ }
+ };
+
+ List<AMQFrame> frames = new ArrayList<>();
+ while (datablockDecoder.decodable(badi))
+ {
+ frames.add(datablockDecoder.createAndPopulateFrame(methodBodyFactory, badi));
+ }
+
+ evaluator.evaluate(socket, frames);
+ }
+
+ private static class TestClientDelegate extends ClientDelegate
+ {
+
+ private final int _maxFrameSize;
+
+ public TestClientDelegate(final ConnectionSettings settings, final int maxFrameSize)
+ {
+ super(settings);
+ _maxFrameSize = maxFrameSize;
+ }
+
+ @Override
+ protected SaslClient createSaslClient(final List<Object> brokerMechs) throws ConnectionException, SaslException
+ {
+ final CallbackHandler handler = new CallbackHandler()
+ {
+ @Override
+ public void handle(final Callback[] callbacks) throws IOException, UnsupportedCallbackException
+ {
+ for (int i = 0; i < callbacks.length; i++)
+ {
+ Callback cb = callbacks[i];
+ if (cb instanceof NameCallback)
+ {
+ ((NameCallback)cb).setName(GUEST_USERNAME);
+ }
+ else if (cb instanceof PasswordCallback)
+ {
+ ((PasswordCallback)cb).setPassword(GUEST_PASSWORD.toCharArray());
+ }
+ else
+ {
+ throw new UnsupportedCallbackException(cb);
+ }
+ }
+
+ }
+ };
+ String[] selectedMechs = {};
+ for(String mech : new String[] { "ANONYMOUS", "PLAIN", "CRAM-MD5", "SCRAM-SHA-1", "SCRAM-SHA-256"})
+ {
+ if(brokerMechs.contains(mech))
+ {
+ selectedMechs = new String[] {mech};
+ break;
+ }
+ }
+
+
+ return Sasl.createSaslClient(selectedMechs,
+ null,
+ getConnectionSettings().getSaslProtocol(),
+ getConnectionSettings().getSaslServerName(),
+ Collections.<String,Object>emptyMap(),
+ handler);
+
+ }
+
+ @Override
+ public void connectionTune(Connection conn, ConnectionTune tune)
+ {
+ int heartbeatInterval = getConnectionSettings().getHeartbeatInterval010();
+ float heartbeatTimeoutFactor = getConnectionSettings().getHeartbeatTimeoutFactor();
+ int actualHeartbeatInterval = calculateHeartbeatInterval(heartbeatInterval,
+ tune.getHeartbeatMin(),
+ tune.getHeartbeatMax());
+
+ conn.connectionTuneOk(tune.getChannelMax(),
+ _maxFrameSize,
+ actualHeartbeatInterval);
+
+ int idleTimeout = (int)(actualHeartbeatInterval * 1000 * heartbeatTimeoutFactor);
+ conn.getNetworkConnection().setMaxReadIdle((int)(actualHeartbeatInterval*heartbeatTimeoutFactor));
+ conn.getNetworkConnection().setMaxWriteIdle(actualHeartbeatInterval);
+ conn.setMaxFrameSize(_maxFrameSize);
+
+
+ conn.setIdleTimeout(idleTimeout);
+
+ int channelMax = tune.getChannelMax();
+ conn.setChannelMax(channelMax == 0 ? Connection.MAX_CHANNEL_MAX : channelMax);
+
+ conn.connectionOpen(getConnectionSettings().getVhost(), null, Option.INSIST);
+ }
+
+ }
+}