diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2014-08-25 14:48:20 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2014-08-25 14:48:20 +0000 |
commit | 2a7c8b3061fda47cc53ef997c339599dd2285395 (patch) | |
tree | 40334230aa105819bb4e1bc0ea7794e39050c64b | |
parent | 717bfa2e17d949bf0771ca14fb15bc99dd41f9fd (diff) | |
download | qpid-python-2a7c8b3061fda47cc53ef997c339599dd2285395.tar.gz |
Merging from trunk r1616861:1617235 in the Java tree
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.30@1620330 13f79535-47bb-0310-9956-ffa450edef68
41 files changed, 916 insertions, 322 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java index 83a2054793..cacb04736c 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java @@ -71,7 +71,6 @@ import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironment import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacadeFactory; import org.apache.qpid.server.store.berkeleydb.replication.ReplicationGroupListener; import org.apache.qpid.server.util.ServerScopedRuntimeException; -import org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHost; import org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHostImpl; import org.apache.qpid.server.virtualhostnode.AbstractVirtualHostNode; import org.codehaus.jackson.map.ObjectMapper; @@ -324,7 +323,13 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu } finally { - stopEnvironment(); + closeEnvironment(); + + // closing the environment does not cause a state change. Adjust the role + // so that our observers will see DETACHED rather than our previous role in the group. + ReplicatedEnvironment.State detached = ReplicatedEnvironment.State.DETACHED; + _lastReplicatedEnvironmentState.set(detached); + attributeSet(ROLE, _role, detached); //Perhaps, having STOPPED operational logging could be sufficient. However, on START we still will be seeing 2 logs: ATTACHED and STARTED getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.DETACHED(getName(), getGroupName())); @@ -345,7 +350,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu } } - private void stopEnvironment() + private void closeEnvironment() { ReplicatedEnvironmentFacade environmentFacade = getReplicatedEnvironmentFacade(); if (environmentFacade != null && _environmentFacade.compareAndSet(environmentFacade, null)) @@ -412,7 +417,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu } finally { - stopEnvironment(); + closeEnvironment(); getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.DETACHED(getName(), getGroupName())); } } @@ -847,6 +852,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu if (nodeState == null) { remoteNode.setRole(ReplicatedEnvironment.State.UNKNOWN.name()); + remoteNode.setLastTransactionId(-1); if (!remoteNode.isDetached()) { getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.DETACHED(remoteNode.getName(), getGroupName())); @@ -909,7 +915,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu } finally { - stopEnvironment(); + closeEnvironment(); } notifyStateChanged(state, State.ERRORED); } diff --git a/qpid/java/bdbstore/src/main/java/resources/js/qpid/management/virtualhostnode/bdb_ha/show.js b/qpid/java/bdbstore/src/main/java/resources/js/qpid/management/virtualhostnode/bdb_ha/show.js index f927b81c9d..b5e12a664e 100644 --- a/qpid/java/bdbstore/src/main/java/resources/js/qpid/management/virtualhostnode/bdb_ha/show.js +++ b/qpid/java/bdbstore/src/main/java/resources/js/qpid/management/virtualhostnode/bdb_ha/show.js @@ -99,7 +99,7 @@ define(["dojo/_base/xhr", { name: 'Role', field: 'role', width: '10%' }, { name: 'Address', field: 'address', width: '35%' }, { name: 'Join Time', field: 'joinTime', width: '25%', formatter: function(value){ return value ? UserPreferences.formatDateTime(value) : "";} }, - { name: 'Replication Transaction ID', field: 'lastKnownReplicationTransactionId', width: '20%' } + { name: 'Replication Transaction ID', field: 'lastKnownReplicationTransactionId', width: '20%', formatter: function(value){ return value > 0 ? value : "N/A";} } ], null, { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/BrokerProperties.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/BrokerProperties.java index fb382a8ca9..ccda1e1fe1 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/BrokerProperties.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/BrokerProperties.java @@ -40,10 +40,6 @@ public class BrokerProperties public static final String PROPERTY_DEFAULT_SUPPORTED_PROTOCOL_REPLY = "qpid.broker_default_supported_protocol_version_reply"; public static final String PROPERTY_DISABLED_FEATURES = "qpid.broker_disabled_features"; - private static final int DEFAULT_FRAME_SIZE = 65535; - public static final String PROPERTY_FRAME_SIZE = "qpid.broker_frame_size"; - public static final int FRAME_SIZE = Integer.getInteger(PROPERTY_FRAME_SIZE, DEFAULT_FRAME_SIZE); - public static final String PROPERTY_BROKER_DEFAULT_AMQP_PROTOCOL_EXCLUDES = "qpid.broker_default_amqp_protocol_excludes"; public static final String PROPERTY_BROKER_DEFAULT_AMQP_PROTOCOL_INCLUDES = "qpid.broker_default_amqp_protocol_includes"; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java index 7d49d0b85f..982ebb01c6 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java @@ -78,6 +78,11 @@ public interface Broker<X extends Broker<X>> extends ConfiguredObject<X>, EventL @ManagedContextDefault(name = BROKER_FLOW_TO_DISK_THRESHOLD) public static final long DEFAULT_FLOW_TO_DISK_THRESHOLD = (long)(0.4 * (double)Runtime.getRuntime().maxMemory()); + String BROKER_FRAME_SIZE = "qpid.broker_frame_size"; + @ManagedContextDefault(name = BROKER_FRAME_SIZE) + long DEFAULT_FRAME_SIZE = 65535; + + @DerivedAttribute String getBuildVersion(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Port.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Port.java index 8dabd3eed6..e98ff1a79a 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Port.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Port.java @@ -38,6 +38,12 @@ public interface Port<X extends Port<X>> extends ConfiguredObject<X> String KEY_STORE = "keyStore"; String TRUST_STORES = "trustStores"; + + String CONNECTION_MAXIMUM_AUTHENTICATION_DELAY = "connection.maximumAuthenticationDelay"; + + @ManagedContextDefault(name = CONNECTION_MAXIMUM_AUTHENTICATION_DELAY) + long DEFAULT_MAX_CONNECTION_AUTHENTICATION_DELAY = 10000l; + // Attributes @ManagedAttribute(defaultValue = "*") diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java index c9045999b9..5041e22104 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java @@ -89,7 +89,7 @@ public interface AMQConnectionModel<T extends AMQConnectionModel<T,S>, S extends long getLastIoTime(); - Port getPort(); + Port<?> getPort(); Transport getTransport(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java index fefd04e81d..0eabcd725e 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java @@ -293,7 +293,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine else { msgheader.limit(_header.remaining()); - msg.position(_header.remaining()); + msg.position(msg.position()+_header.remaining()); } _header.put(msgheader); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java index e6afbc6e90..f614ff5847 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java @@ -584,7 +584,7 @@ public class MockConsumer implements ConsumerTarget } @Override - public Port getPort() + public Port<?> getPort() { return null; } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java index cefd1ee0b2..dc60a37a7f 100755 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java @@ -20,27 +20,32 @@ */ package org.apache.qpid.server.protocol.v0_10; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.security.AccessController; +import java.security.PrivilegedAction; + +import javax.security.auth.Subject; + +import org.apache.log4j.Logger; + import org.apache.qpid.protocol.ServerProtocolEngine; -import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.logging.messages.ConnectionMessages; import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Transport; +import org.apache.qpid.transport.Constant; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.network.Assembler; import org.apache.qpid.transport.network.Disassembler; import org.apache.qpid.transport.network.InputHandler; import org.apache.qpid.transport.network.NetworkConnection; -import javax.security.auth.Subject; -import java.net.SocketAddress; -import java.nio.ByteBuffer; -import java.security.AccessController; -import java.security.PrivilegedAction; - public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocolEngine { public static final int MAX_FRAME_SIZE = 64 * 1024 - 1; + private static final Logger _logger = Logger.getLogger(ProtocolEngine_0_10.class); + private NetworkConnection _network; private long _readBytes; @@ -87,7 +92,9 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol _network = network; _connection.setNetworkConnection(network); - _connection.setSender(new Disassembler(wrapSender(sender), MAX_FRAME_SIZE)); + Disassembler disassembler = new Disassembler(wrapSender(sender), Constant.MIN_MAX_FRAME_SIZE); + _connection.setSender(disassembler); + _connection.addFrameSizeObserver(disassembler); // FIXME Two log messages to maintain compatibility with earlier protocol versions _connection.getEventLogger().message(ConnectionMessages.OPEN(null, "0-10", null, null, false, true, false, false)); @@ -154,6 +161,26 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol public void received(final ByteBuffer buf) { _lastReadTime = System.currentTimeMillis(); + if(_connection.getAuthorizedPrincipal() == null && + (_lastReadTime - _createTime) > _connection.getPort().getContextValue(Long.class, + Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY) ) + { + Subject.doAs(_connection.getAuthorizedSubject(), new PrivilegedAction<Object>() + { + @Override + public Object run() + { + + _logger.warn("Connection has taken more than " + + _connection.getPort() + .getContextValue(Long.class, Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY) + + "ms to establish identity. Closing as possible DoS."); + _connection.getEventLogger().message(ConnectionMessages.IDLE_CLOSE()); + _network.close(); + return null; + } + }); + } super.received(buf); _connection.receivedComplete(); } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java index 2ad79ad980..8ddd04f51a 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java @@ -75,7 +75,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S private final long _connectionId; private final Object _reference = new Object(); private VirtualHostImpl _virtualHost; - private Port _port; + private Port<?> _port; private AtomicLong _lastIoTime = new AtomicLong(); private boolean _blocking; private Transport _transport; @@ -189,12 +189,12 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S } @Override - public Port getPort() + public Port<?> getPort() { return _port; } - public void setPort(Port port) + public void setPort(Port<?> port) { _port = port; } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java index 7751ff765d..bab2d802e8 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java @@ -50,21 +50,7 @@ import org.apache.qpid.server.security.SubjectCreator; import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus; import org.apache.qpid.server.security.auth.SubjectAuthenticationResult; import org.apache.qpid.server.virtualhost.VirtualHostImpl; -import org.apache.qpid.transport.Binary; -import org.apache.qpid.transport.Connection; -import org.apache.qpid.transport.ConnectionClose; -import org.apache.qpid.transport.ConnectionCloseCode; -import org.apache.qpid.transport.ConnectionOpen; -import org.apache.qpid.transport.ConnectionOpenOk; -import org.apache.qpid.transport.ConnectionStartOk; -import org.apache.qpid.transport.ConnectionTuneOk; -import org.apache.qpid.transport.ServerDelegate; -import org.apache.qpid.transport.Session; -import org.apache.qpid.transport.SessionAttach; -import org.apache.qpid.transport.SessionDelegate; -import org.apache.qpid.transport.SessionDetach; -import org.apache.qpid.transport.SessionDetachCode; -import org.apache.qpid.transport.SessionDetached; +import org.apache.qpid.transport.*; import org.apache.qpid.transport.network.NetworkConnection; public class ServerConnectionDelegate extends ServerDelegate @@ -76,15 +62,16 @@ public class ServerConnectionDelegate extends ServerDelegate private int _maxNoOfChannels; private Map<String,Object> _clientProperties; private final SubjectCreator _subjectCreator; + private int _maximumFrameSize; - public ServerConnectionDelegate(Broker broker, String localFQDN, SubjectCreator subjectCreator) + public ServerConnectionDelegate(Broker<?> broker, String localFQDN, SubjectCreator subjectCreator) { this(createConnectionProperties(broker), Collections.singletonList((Object)"en_US"), broker, localFQDN, subjectCreator); } private ServerConnectionDelegate(Map<String, Object> properties, List<Object> locales, - Broker broker, + Broker<?> broker, String localFQDN, SubjectCreator subjectCreator) { @@ -94,9 +81,10 @@ public class ServerConnectionDelegate extends ServerDelegate _localFQDN = localFQDN; _maxNoOfChannels = broker.getConnection_sessionCountLimit(); _subjectCreator = subjectCreator; + _maximumFrameSize = (int) Math.min(0xffffl, broker.getContextValue(Long.class, Broker.BROKER_FRAME_SIZE)); } - private static List<String> getFeatures(Broker broker) + private static List<String> getFeatures(Broker<?> broker) { String brokerDisabledFeatures = System.getProperty(BrokerProperties.PROPERTY_DISABLED_FEATURES); final List<String> features = new ArrayList<String>(); @@ -108,7 +96,7 @@ public class ServerConnectionDelegate extends ServerDelegate return Collections.unmodifiableList(features); } - private static Map<String, Object> createConnectionProperties(final Broker broker) + private static Map<String, Object> createConnectionProperties(final Broker<?> broker) { final Map<String,Object> map = new HashMap<String,Object>(); // Federation tag is used by the client to identify the broker instance @@ -234,6 +222,7 @@ public class ServerConnectionDelegate extends ServerDelegate { ServerConnection sconn = (ServerConnection) conn; int okChannelMax = ok.getChannelMax(); + int okMaxFrameSize = ok.getMaxFrameSize(); if (okChannelMax > getChannelMax()) { @@ -246,6 +235,31 @@ public class ServerConnectionDelegate extends ServerDelegate return; } + if(okMaxFrameSize > getFrameMax()) + { + LOGGER.error("Connection '" + sconn.getConnectionId() + "' being severed, " + + "client connectionTuneOk returned a frameMax (" + okMaxFrameSize + + ") above the server's offered limit (" + getFrameMax() +")"); + + //Due to the error we must forcefully close the connection without negotiation + sconn.getSender().close(); + return; + } + else if(okMaxFrameSize > 0 && okMaxFrameSize < Constant.MIN_MAX_FRAME_SIZE) + { + LOGGER.error("Connection '" + sconn.getConnectionId() + "' being severed, " + + "client connectionTuneOk returned a frameMax (" + okMaxFrameSize + + ") below the minimum permitted size (" + Constant.MIN_MAX_FRAME_SIZE +")"); + + //Due to the error we must forcefully close the connection without negotiation + sconn.getSender().close(); + return; + } + else if(okMaxFrameSize == 0) + { + okMaxFrameSize = getFrameMax(); + } + final NetworkConnection networkConnection = sconn.getNetworkConnection(); if(ok.hasHeartbeat()) { @@ -266,6 +280,8 @@ public class ServerConnectionDelegate extends ServerDelegate } setConnectionTuneOkChannelMax(sconn, okChannelMax); + + conn.setMaxFrameSize(okMaxFrameSize); } @Override @@ -279,6 +295,12 @@ public class ServerConnectionDelegate extends ServerDelegate _maxNoOfChannels = channelMax; } + @Override + protected int getFrameMax() + { + return _maximumFrameSize; + } + @Override public void sessionDetach(Connection conn, SessionDetach dtc) { // To ensure a clean detach, we stop any remaining subscriptions. Stop ensures diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java index 46a9430814..1c264e52c6 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java @@ -44,37 +44,38 @@ import javax.security.auth.Subject; import javax.security.sasl.SaslServer; import org.apache.log4j.Logger; + import org.apache.qpid.AMQChannelException; import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.*; -import org.apache.qpid.codec.AMQCodecFactory; +import org.apache.qpid.codec.AMQDecoder; import org.apache.qpid.common.QpidProperties; import org.apache.qpid.common.ServerPropertyNames; +import org.apache.qpid.framing.*; import org.apache.qpid.properties.ConnectionStartProperties; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.connection.ConnectionPrincipal; import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.logging.EventLogger; -import org.apache.qpid.server.message.InstanceProperties; -import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.configuration.BrokerProperties; -import org.apache.qpid.server.protocol.AMQSessionModel; -import org.apache.qpid.server.protocol.SessionModelListener; -import org.apache.qpid.server.protocol.v0_8.handler.ServerMethodDispatcherImpl; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.messages.ConnectionMessages; import org.apache.qpid.server.logging.subjects.ConnectionLogSubject; +import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Transport; +import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.protocol.SessionModelListener; +import org.apache.qpid.server.protocol.v0_8.handler.ServerMethodDispatcherImpl; import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter; import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverterRegistry; -import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; import org.apache.qpid.server.protocol.v0_8.state.AMQState; import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; +import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; @@ -93,7 +94,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi // channels. This value must be of the form 2^x - 1. private static final int CHANNEL_CACHE_SIZE = 0xff; private static final int REUSABLE_BYTE_BUFFER_CAPACITY = 65 * 1024; - private final Port _port; + private final Port<?> _port; + private final long _creationTime; private AMQShortString _contextKey; @@ -123,7 +125,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi private final AMQStateManager _stateManager; - private AMQCodecFactory _codecFactory; + private AMQDecoder _decoder; private SaslServer _saslServer; @@ -166,12 +168,13 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi private final ReentrantLock _receivedLock; private AtomicLong _lastWriteTime = new AtomicLong(System.currentTimeMillis()); - private final Broker _broker; + private final Broker<?> _broker; private final Transport _transport; private volatile boolean _closeWhenNoRoute; private volatile boolean _stopped; private long _readBytes; + private boolean _authenticated; public AMQProtocolEngine(Broker broker, final NetworkConnection network, @@ -185,7 +188,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi _maxNoOfChannels = broker.getConnection_sessionCountLimit(); _receivedLock = new ReentrantLock(); _stateManager = new AMQStateManager(broker, this); - _codecFactory = new AMQCodecFactory(true, this); + _decoder = new AMQDecoder(true, this); _connectionID = connectionId; _logSubject = new ConnectionLogSubject(this); @@ -210,6 +213,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi _dataDelivered = new StatisticsCounter("data-delivered-" + getSessionID()); _messagesReceived = new StatisticsCounter("messages-received-" + getSessionID()); _dataReceived = new StatisticsCounter("data-received-" + getSessionID()); + _creationTime = System.currentTimeMillis(); } private <T> T runAsSubject(PrivilegedAction<T> action) @@ -247,6 +251,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi public void setMaxFrameSize(long frameMax) { _maxFrameSize = frameMax; + _decoder.setMaxFrameSize(frameMax); } public long getMaxFrameSize() @@ -277,7 +282,18 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi @Override public Void run() { + final long arrivalTime = System.currentTimeMillis(); + if(!_authenticated && + (arrivalTime - _creationTime) > _port.getContextValue(Long.class, + Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY)) + { + _logger.warn("Connection has taken more than " + + _port.getContextValue(Long.class, Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY) + + "ms to establish identity. Closing as possible DoS."); + getEventLogger().message(ConnectionMessages.IDLE_CLOSE()); + closeProtocolSession(); + } _lastReceivedTime = arrivalTime; _lastIoTime = arrivalTime; _readBytes += msg.remaining(); @@ -285,7 +301,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi _receivedLock.lock(); try { - final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg); + final ArrayList<AMQDataBlock> dataBlocks = _decoder.decodeBuffer(msg); for (AMQDataBlock dataBlock : dataBlocks) { try @@ -479,7 +495,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi private synchronized void protocolInitiationReceived(ProtocolInitiation pi) { // this ensures the codec never checks for a PI message again - (_codecFactory.getDecoder()).setExpectProtocolInitiation(false); + _decoder.setExpectProtocolInitiation(false); try { // Log incoming protocol negotiation request @@ -1200,6 +1216,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi throw new IllegalArgumentException("authorizedSubject cannot be null"); } + _authenticated = true; _authorizedSubject.getPrincipals().addAll(authorizedSubject.getPrincipals()); _authorizedSubject.getPrivateCredentials().addAll(authorizedSubject.getPrivateCredentials()); _authorizedSubject.getPublicCredentials().addAll(authorizedSubject.getPublicCredentials()); @@ -1273,7 +1290,16 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi public void readerIdle() { - // TODO - enforce disconnect on lack of inbound data + Subject.doAs(_authorizedSubject, new PrivilegedAction<Object>() + { + @Override + public Object run() + { + getEventLogger().message(ConnectionMessages.IDLE_CLOSE()); + _network.close(); + return null; + } + }); } public synchronized void writerIdle() @@ -1344,7 +1370,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi } @Override - public Port getPort() + public Port<?> getPort() { return _port; } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionSecureOkMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionSecureOkMethodHandler.java index a2b596e2b1..92552cb011 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionSecureOkMethodHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionSecureOkMethodHandler.java @@ -33,7 +33,6 @@ import org.apache.qpid.framing.ConnectionSecureOkBody; import org.apache.qpid.framing.ConnectionTuneBody; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.protocol.v0_8.state.AMQState; @@ -59,7 +58,7 @@ public class ConnectionSecureOkMethodHandler implements StateAwareMethodListener public void methodReceived(AMQStateManager stateManager, ConnectionSecureOkBody body, int channelId) throws AMQException { - Broker broker = stateManager.getBroker(); + Broker<?> broker = stateManager.getBroker(); AMQProtocolSession session = stateManager.getProtocolSession(); SubjectCreator subjectCreator = stateManager.getSubjectCreator(); @@ -99,7 +98,7 @@ public class ConnectionSecureOkMethodHandler implements StateAwareMethodListener ConnectionTuneBody tuneBody = methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(), - BrokerProperties.FRAME_SIZE, + broker.getContextValue(Long.class, Broker.BROKER_FRAME_SIZE), broker.getConnection_heartBeatDelay()); session.writeFrame(tuneBody.generateFrame(0)); session.setAuthorizedSubject(authResult.getSubject()); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionStartOkMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionStartOkMethodHandler.java index dc4f010a66..d6801c0fbc 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionStartOkMethodHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionStartOkMethodHandler.java @@ -32,7 +32,6 @@ import org.apache.qpid.framing.ConnectionStartOkBody; import org.apache.qpid.framing.ConnectionTuneBody; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.protocol.v0_8.state.AMQState; @@ -59,7 +58,7 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener< public void methodReceived(AMQStateManager stateManager, ConnectionStartOkBody body, int channelId) throws AMQException { - Broker broker = stateManager.getBroker(); + Broker<?> broker = stateManager.getBroker(); AMQProtocolSession session = stateManager.getProtocolSession(); _logger.info("SASL Mechanism selected: " + body.getMechanism()); @@ -113,7 +112,7 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener< stateManager.changeState(AMQState.CONNECTION_NOT_TUNED); ConnectionTuneBody tuneBody = methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(), - BrokerProperties.FRAME_SIZE, + broker.getContextValue(Long.class,Broker.BROKER_FRAME_SIZE), broker.getConnection_heartBeatDelay()); session.writeFrame(tuneBody.generateFrame(0)); break; diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionTuneOkMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionTuneOkMethodHandler.java index 5fddab6576..108c19dbaf 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionTuneOkMethodHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionTuneOkMethodHandler.java @@ -22,8 +22,11 @@ package org.apache.qpid.server.protocol.v0_8.handler; import org.apache.log4j.Logger; +import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; import org.apache.qpid.framing.ConnectionTuneOkBody; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.protocol.v0_8.state.AMQState; import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; @@ -49,8 +52,29 @@ public class ConnectionTuneOkMethodHandler implements StateAwareMethodListener<C _logger.debug(body); } stateManager.changeState(AMQState.CONNECTION_NOT_OPENED); + session.initHeartbeats(body.getHeartbeat()); - session.setMaxFrameSize(body.getFrameMax()); + + long brokerFrameMax = stateManager.getBroker().getContextValue(Long.class,Broker.BROKER_FRAME_SIZE); + if(brokerFrameMax != 0 && body.getFrameMax() > brokerFrameMax) + { + throw new AMQConnectionException(AMQConstant.SYNTAX_ERROR, + "Attempt to set max frame size to " + body.getFrameMax() + + "greater than the broker will allow: " + + brokerFrameMax, + body.getClazz(), body.getMethod(), + body.getMajor(), body.getMinor(),null); + } + else if(body.getFrameMax() > 0 && body.getFrameMax() < AMQConstant.FRAME_MIN_SIZE.getCode()) + { + throw new AMQConnectionException(AMQConstant.SYNTAX_ERROR, + "Attempt to set max frame size to " + body.getFrameMax() + + "which is smaller than the specification definined minimum: " + + AMQConstant.FRAME_MIN_SIZE.getCode(), + body.getClazz(), body.getMethod(), + body.getMajor(), body.getMinor(),null); + } + session.setMaxFrameSize(body.getFrameMax()== 0l ? (brokerFrameMax == 0l ? 0xFFFFFFFFl : brokerFrameMax) : body.getFrameMax()); long maxChannelNumber = body.getChannelMax(); //0 means no implied limit, except that forced by protocol limitations (0xFFFF) diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/AMQStateManager.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/AMQStateManager.java index cb9295ac49..3c1f1dedc3 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/AMQStateManager.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/AMQStateManager.java @@ -51,12 +51,12 @@ public class AMQStateManager implements AMQMethodListener { private static final Logger _logger = Logger.getLogger(AMQStateManager.class); - private final Broker _broker; + private final Broker<?> _broker; private final AMQProtocolSession _protocolSession; /** The current state */ private AMQState _currentState; - public AMQStateManager(Broker broker, AMQProtocolSession protocolSession) + public AMQStateManager(Broker<?> broker, AMQProtocolSession protocolSession) { _broker = broker; _protocolSession = protocolSession; @@ -69,7 +69,7 @@ public class AMQStateManager implements AMQMethodListener * * @return the Broker */ - public Broker getBroker() + public Broker<?> getBroker() { return _broker; } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java index ffa65b2477..2a48ccb2df 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java @@ -62,7 +62,7 @@ import org.apache.qpid.server.virtualhost.VirtualHostImpl; public class Connection_1_0 implements ConnectionEventListener, AMQConnectionModel<Connection_1_0,Session_1_0> { - private final Port _port; + private final Port<?> _port; private final Broker _broker; private final SubjectCreator _subjectCreator; private VirtualHostImpl _vhost; @@ -358,7 +358,7 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod } @Override - public Port getPort() + public Port<?> getPort() { return _port; } diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Broker.js b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Broker.js index 11a79984b3..8cc3e76b58 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Broker.js +++ b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Broker.js @@ -356,20 +356,20 @@ define(["dojo/_base/xhr", that.vhostsGrid = new UpdatableStore(that.brokerData.virtualhostnodes, query(".broker-virtualhosts")[0], [ - { name: "Node Name", field: "name", width: "15%"}, + { name: "Node Name", field: "name", width: "10%"}, { name: "Node State", field: "state", width: "10%"}, { name: "Node Type", field: "type", width: "10%"}, - { name: "Host Name", field: "_item", width: "15%", + { name: "Host Name", field: "_item", width: "10%", formatter: function(item){ return item && item.virtualhosts? item.virtualhosts[0].name: "N/A"; } }, - { name: "Host State", field: "_item", width: "10%", + { name: "Host State", field: "_item", width: "15%", formatter: function(item){ return item && item.virtualhosts? item.virtualhosts[0].state: "N/A"; } }, - { name: "Host Type", field: "_item", width: "10%", + { name: "Host Type", field: "_item", width: "15%", formatter: function(item){ return item && item.virtualhosts? item.virtualhosts[0].type: "N/A"; } diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js index 025390b9ff..59e49f3302 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js +++ b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js @@ -333,7 +333,11 @@ define(["dojo/_base/xhr", "bytesInRateUnits", "msgOutRate", "bytesOutRate", - "bytesOutRateUnits"]); + "bytesOutRateUnits", + "queueFlowResumeSizeBytes", + "queueFlowControlSizeBytes", + "maximumDeliveryAttempts", + "oldestMessageAge"]); @@ -413,6 +417,13 @@ define(["dojo/_base/xhr", { this.messageGroups.style.display = "none"; } + + this.queueFlowControlSizeBytes.innerHTML = entities.encode(String(this.queueData[ "queueFlowControlSizeBytes" ])); + this.queueFlowResumeSizeBytes.innerHTML = entities.encode(String(this.queueData[ "queueFlowResumeSizeBytes" ])); + + this.oldestMessageAge.innerHTML = entities.encode(String(this.queueData[ "oldestMessageAge" ] / 1000)); + var maximumDeliveryAttempts = this.queueData[ "maximumDeliveryAttempts" ]; + this.maximumDeliveryAttempts.innerHTML = entities.encode(String( maximumDeliveryAttempts == 0 ? "" : maximumDeliveryAttempts)); }; QueueUpdater.prototype.update = function() diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html b/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html index 52903a80ea..961f60e214 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html +++ b/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html @@ -20,22 +20,81 @@ --> <div class="queue"> <div data-dojo-type="dijit.TitlePane" data-dojo-props="title: 'Queue Attributes', open: true"> + <div class="clear"> <div class="formLabel-labelCell">Name:</div> - <div class="name"></div> - </div> - <div class="clear"> - <div class="formLabel-labelCell">State:</div> - <div class="state"></div> + <div class="name formValue-valueCell"></div> </div> <div class="clear"> - <div class="formLabel-labelCell">Durable:</div> - <div class="durable"></div> + <div class="alignLeft"> + <div class="clear"> + <div class="formLabel-labelCell">Type:</div> + <div class="type formValue-valueCell"></div> + <div class="typeQualifier formValue-valueCell"></div> + </div> + <div class="clear"> + <div class="formLabel-labelCell">State:</div> + <div class="state formValue-valueCell"></div> + </div> + <div class="clear"> + <div class="formLabel-labelCell">Durable:</div> + <div class="durable formValue-valueCell"></div> + </div> + <div class="clear"> + <div class="formLabel-labelCell">Lifespan:</div> + <div class="lifetimePolicy formValue-valueCell"></div> + </div> + <div class="clear"> + <div class="formLabel-labelCell">Persist Messages:</div> + <div class="messageDurability formValue-valueCell"></div> + </div> </div> - <div class="clear"> - <div class="formLabel-labelCell">Persist Messages:</div> - <div class="messageDurability"></div> + <div class="alignRight"> + <div> + <div class="formLabel-labelCell">Inbound:</div> + <div class="formValue-valueCell"> + <span class="msgInRate"></span> + <span> msg/s</span> + <span class="bytesInRate"></span> + <span class="bytesInRateUnits"></span> + </div> + </div> + <div> + <div class="formLabel-labelCell">Outbound:</div> + <div class="formValue-valueCell"> + <span class="msgOutRate"></span> + <span> msg/s</span> + <span class="bytesOutRate"></span> + <span class="bytesOutRateUnits"></span> + </div> + </div> + <div> + <div class="formLabel-labelCell">Size:</div> + <div class="formValue-valueCell"> + <span class="queueDepthMessages"></span> + <span> msgs</span> + <span class="queueDepthBytes">(</span> + <span class="queueDepthBytesUnits">)</span> + </div> + </div> + <div> + <div class="formLabel-labelCell">Pre-fetched:</div> + <div class="formValue-valueCell"> + <span class="unacknowledgedMessages"></span> + <span> msgs</span> + <span class="unacknowledgedBytes">(</span> + <span class="unacknowledgedBytesUnits">)</span> + </div> + </div> + <div> + <div class="formLabel-labelCell">Oldest Message Age:</div> + <div class="formValue-valueCell"> + <span class="oldestMessageAge"></span> + <span> secs</span> + </div> + </div> </div> + <div class="clear"></div> <div class="clear"> <div class="formLabel-labelCell">Enforced Max. Ttl(ms):</div> <div class="maximumMessageTtl"></div> @@ -53,55 +112,12 @@ <div class="owner"></div> </div> <div class="clear"> - <div class="formLabel-labelCell">Lifespan:</div> - <div class="lifetimePolicy"></div> - </div> - <div class="clear"> - <div class="formLabel-labelCell">Type:</div> - <div> - <span class="type"></span> - <span class="typeQualifier"></span> - </div> - </div> - <div class="clear"> - <div class="formLabel-labelCell">Size:</div> - <div> - <span class="queueDepthMessages"></span> - <span> msgs</span> - <span class="queueDepthBytes">(</span> - <span class="queueDepthBytesUnits">)</span> - </div> - </div> - <div class="clear"> - <div class="formLabel-labelCell">Pre-fetched:</div> - <div> - <span class="unacknowledgedMessages"></span> - <span> msgs</span> - <span class="unacknowledgedBytes">(</span> - <span class="unacknowledgedBytesUnits">)</span> - </div> - </div> - <div class="clear"> - <div class="formLabel-labelCell">Inbound:</div> - <div> - <span class="msgInRate"></span> - <span> msg/s</span> - <span class="bytesInRate">(</span> - <span class="bytesInRateUnits">)</span> - </div> + <div class="formLabel-labelCell">Alternate Exchange:</div> + <div class="alternateExchange"></div> </div> <div class="clear"> - <div class="formLabel-labelCell">Outbound:</div> - <div> - <span class="msgOutRate"></span> - <span> msg/s</span> - <span class="bytesOutRate">(</span> - <span class="bytesOutRateUnits">)</span> - </div> - </div> - <div class="clear"> - <div class="formLabel-labelCell">AlternateExchange:</div> - <div class="alternateExchange"></div> + <div class="formLabel-labelCell">Maximum Delivery Attempts:</div> + <div class="maximumDeliveryAttempts"></div> </div> <div class="clear messageGroups"> <div class="clear"> @@ -133,6 +149,25 @@ <button data-dojo-type="dijit.form.Button" class="copyMessagesButton" type="button">Copy Messages</button> </div> <br/> + <div data-dojo-type="dijit.TitlePane" data-dojo-props="title: 'Flow Control Settings', open: false"> + <div class="clear"> + <div class="formLabel-labelCell">Capacity:</div> + <div> + <span class="queueFlowControlSizeBytes"></span> + <span>B</span> + </div> + </div> + <div class="clear"> + <div class="formLabel-labelCell">Resume Capacity:</div> + <div> + <span class="queueFlowResumeSizeBytes"></span> + <span>B</span> + </div> + </div> + <div class="clear"></div> + </div> + + <br/> <div data-dojo-type="dijit.TitlePane" data-dojo-props="title: 'Alerting Thresholds', open: false"> <div class="clear"> <div class="formLabel-labelCell">Queue Depth:</div> diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java index e1a0e18262..d76fdf25e6 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java @@ -20,6 +20,9 @@ */ package org.apache.qpid.client.handler; +import java.util.HashMap; +import java.util.Map; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,9 +32,6 @@ import org.apache.qpid.client.state.AMQMethodNotImplementedException; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.framing.*; -import java.util.HashMap; -import java.util.Map; - public class ClientMethodDispatcherImpl implements MethodDispatcher { @@ -101,6 +101,10 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher } DispatcherFactory factory = _dispatcherFactories.get(version); + if(factory == null) + { + throw new UnsupportedOperationException("The protocol version " + version + " is not supported"); + } return factory.createMethodDispatcher(session); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java index 617380e149..1f2df2026b 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java @@ -72,6 +72,8 @@ public class ConnectionTuneMethodHandler implements StateAwareMethodListener<Con ConnectionTuneOkBody tuneOkBody = methodRegistry.createConnectionTuneOkBody(params.getChannelMax(), params.getFrameMax(), params.getHeartbeat()); + + session.setMaxFrameSize(params.getFrameMax()); // Be aware of possible changes to parameter order as versions change. session.writeFrame(tuneOkBody.generateFrame(channelId)); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index c8ebb7f9c7..08f05cc8d6 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -38,7 +38,7 @@ import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateWaiter; import org.apache.qpid.client.state.listener.SpecificMethodFrameListener; -import org.apache.qpid.codec.AMQCodecFactory; +import org.apache.qpid.codec.AMQDecoder; import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.framing.AMQBody; import org.apache.qpid.framing.AMQDataBlock; @@ -165,7 +165,7 @@ public class AMQProtocolHandler implements ProtocolEngine /** Object to lock on when changing the latch */ private Object _failoverLatchChange = new Object(); - private AMQCodecFactory _codecFactory; + private AMQDecoder _decoder; private ProtocolVersion _suggestedProtocolVersion; @@ -188,7 +188,7 @@ public class AMQProtocolHandler implements ProtocolEngine _connection = con; _protocolSession = new AMQProtocolSession(this, _connection); _stateManager = new AMQStateManager(_protocolSession); - _codecFactory = new AMQCodecFactory(false, _protocolSession); + _decoder = new AMQDecoder(false, _protocolSession); _failoverHandler = new FailoverHandler(this); } @@ -443,7 +443,7 @@ public class AMQProtocolHandler implements ProtocolEngine _lastReadTime = System.currentTimeMillis(); try { - final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg); + final ArrayList<AMQDataBlock> dataBlocks = _decoder.decodeBuffer(msg); // Decode buffer int size = dataBlocks.size(); @@ -927,4 +927,9 @@ public class AMQProtocolHandler implements ProtocolEngine { _heartbeatListener.heartbeatReceived(); } + + public void setMaxFrameSize(final long frameMax) + { + _decoder.setMaxFrameSize(frameMax == 0l ? 0xffffffffl : frameMax); + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index 121715d439..2c69aa1b51 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -20,8 +20,16 @@ */ package org.apache.qpid.client.protocol; +import java.nio.ByteBuffer; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import javax.jms.JMSException; +import javax.security.sasl.SaslClient; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQSession; @@ -47,13 +55,6 @@ import org.apache.qpid.transport.ConnectionSettings; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.TransportException; -import javax.jms.JMSException; -import javax.security.sasl.SaslClient; - -import java.nio.ByteBuffer; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - /** * Wrapper for protocol session that provides type-safe access to session attributes. * <p> @@ -543,4 +544,9 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession { return _connectionStartServerProperties; } + + public void setMaxFrameSize(final long frameMax) + { + _protocolHandler.setMaxFrameSize(frameMax); + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.java b/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.java index 6f99e53055..8d53438bb7 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.java @@ -20,11 +20,6 @@ */ package org.apache.qpid.client.security; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.qpid.util.FileUtils; - import java.io.IOException; import java.io.InputStream; import java.util.Arrays; @@ -39,6 +34,11 @@ import java.util.Set; import java.util.StringTokenizer; import java.util.TreeMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.qpid.util.FileUtils; + /** * CallbackHandlerRegistry is a registry for call back handlers for user authentication and interaction during user * authentication. It is capable of reading its configuration from a properties file containing call back handler @@ -75,7 +75,7 @@ public class CallbackHandlerRegistry /** Ordered collection of mechanisms for which callback handlers exist. */ private Collection<String> _mechanisms; - private static final Collection<String> MECHS_THAT_NEED_USERPASS = Arrays.asList(new String [] {"PLAIN", "AMQPLAIN", "CRAM-MD5","CRAM-MD5-HASHED"}); + private static final Collection<String> MECHS_THAT_NEED_USERPASS = Arrays.asList(new String [] {"PLAIN", "AMQPLAIN", "CRAM-MD5","CRAM-MD5-HASHED", "SCRAM-SHA-1", "SCRAM-SHA-256"}); static { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.properties b/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.properties index 8f02ee2c38..b77fa033d6 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.properties +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.properties @@ -26,11 +26,11 @@ EXTERNAL.1=org.apache.qpid.client.security.UsernamePasswordCallbackHandler GSSAPI.2=org.apache.qpid.client.security.UsernamePasswordCallbackHandler -CRAM-MD5-HASHED.3=org.apache.qpid.client.security.UsernameHashedPasswordCallbackHandler -CRAM-MD5.4=org.apache.qpid.client.security.UsernamePasswordCallbackHandler -AMQPLAIN.5=org.apache.qpid.client.security.UsernamePasswordCallbackHandler -PLAIN.6=org.apache.qpid.client.security.UsernamePasswordCallbackHandler -ANONYMOUS.7=org.apache.qpid.client.security.UsernamePasswordCallbackHandler -SCRAM-SHA-1.8=org.apache.qpid.client.security.UsernamePasswordCallbackHandler -SCRAM-SHA-256.9=org.apache.qpid.client.security.UsernamePasswordCallbackHandler +SCRAM-SHA-256.3=org.apache.qpid.client.security.UsernamePasswordCallbackHandler +SCRAM-SHA-1.4=org.apache.qpid.client.security.UsernamePasswordCallbackHandler +CRAM-MD5-HASHED.5=org.apache.qpid.client.security.UsernameHashedPasswordCallbackHandler +CRAM-MD5.6=org.apache.qpid.client.security.UsernamePasswordCallbackHandler +PLAIN.7=org.apache.qpid.client.security.UsernamePasswordCallbackHandler +AMQPLAIN.8=org.apache.qpid.client.security.UsernamePasswordCallbackHandler +ANONYMOUS.9=org.apache.qpid.client.security.UsernamePasswordCallbackHandler diff --git a/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java deleted file mode 100644 index 220e33724a..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.codec; - -import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; - -/** - * AMQCodecFactory is a Mina codec factory. It supplies the encoders and decoders need to read and write the bytes to - * the wire. - */ -public class AMQCodecFactory -{ - - /** Holds the protocol decoder. */ - private final AMQDecoder _frameDecoder; - - /** - * Creates a new codec factory, specifiying whether it is expected that the first frame of data should be an - * initiation. This is the case for the broker, which always expects to received the protocol initiation on a newly - * connected client. - * - * @param expectProtocolInitiation <tt>true</tt> if the first frame received is going to be a protocol initiation - * frame, <tt>false</tt> if it is going to be a standard AMQ data block. - * @param session protocol session (connection) - */ - public AMQCodecFactory(boolean expectProtocolInitiation, AMQVersionAwareProtocolSession session) - { - _frameDecoder = new AMQDecoder(expectProtocolInitiation, session); - } - - - /** - * Gets the AMQP decoder. - * - * @return The AMQP decoder. - */ - public AMQDecoder getDecoder() - { - return _frameDecoder; - } -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java b/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java index 3ccd7e2031..ebecb7b483 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java @@ -20,6 +20,16 @@ */ package org.apache.qpid.codec; +import java.io.ByteArrayInputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.ListIterator; + import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.AMQDataBlockDecoder; import org.apache.qpid.framing.AMQFrameDecodingException; @@ -31,16 +41,6 @@ import org.apache.qpid.framing.EncodingUtils; import org.apache.qpid.framing.ProtocolInitiation; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; -import java.io.ByteArrayInputStream; -import java.io.DataInput; -import java.io.DataInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; -import java.util.ListIterator; - /** * AMQDecoder delegates the decoding of AMQP either to a data block decoder, or in the case of new connections, to a * protocol initiation decoder. It is a cumulative decoder, which means that it can accumulate data to decode in the @@ -66,6 +66,8 @@ public class AMQDecoder private AMQMethodBodyFactory _bodyFactory; + private boolean _firstRead = true; + private List<ByteArrayInputStream> _remainingBufs = new ArrayList<ByteArrayInputStream>(); /** @@ -94,6 +96,11 @@ public class AMQDecoder _expectProtocolInitiation = expectProtocolInitiation; } + public void setMaxFrameSize(final long frameMax) + { + _dataBlockDecoder.setMaxFrameSize(frameMax); + } + private class RemainingByteArrayInputStream extends InputStream { private int _currentListPos; @@ -234,6 +241,17 @@ public class AMQDecoder msg = new ByteArrayDataInput(buf.array(),buf.arrayOffset()+buf.position(), buf.remaining()); } + // If this is the first read then we may be getting a protocol initiation back if we tried to negotiate + // an unsupported version + if(_firstRead && buf.hasRemaining()) + { + _firstRead = false; + if(!_expectProtocolInitiation && buf.get(buf.position()) > 8) + { + _expectProtocolInitiation = true; + } + } + boolean enoughData = true; while (enoughData) { diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java index 9d5e654ad0..d00ddf4074 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java @@ -20,12 +20,13 @@ */ package org.apache.qpid.framing; +import java.io.IOException; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpid.codec.MarkableDataInput; - -import java.io.IOException; +import org.apache.qpid.protocol.AMQConstant; public class AMQDataBlockDecoder { @@ -40,6 +41,7 @@ public class AMQDataBlockDecoder } private Logger _logger = LoggerFactory.getLogger(AMQDataBlockDecoder.class); + private long _maxFrameSize = AMQConstant.FRAME_MIN_SIZE.getCode(); public AMQDataBlockDecoder() { } @@ -59,14 +61,17 @@ public class AMQDataBlockDecoder // Get an unsigned int, lifted from MINA ByteBuffer getUnsignedInt() final long bodySize = in.readInt() & 0xffffffffL; - + if(bodySize > _maxFrameSize) + { + throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR, "Incoming frame size of "+bodySize+" is larger than negotiated maximum of " + _maxFrameSize); + } in.reset(); return (remainingAfterAttributes >= bodySize); } - public AMQFrame createAndPopulateFrame(AMQMethodBodyFactory methodBodyFactory, MarkableDataInput in) + public AMQFrame createAndPopulateFrame(BodyFactory methodBodyFactory, MarkableDataInput in) throws AMQFrameDecodingException, AMQProtocolVersionException, IOException { final byte type = in.readByte(); @@ -83,7 +88,7 @@ public class AMQDataBlockDecoder if (bodyFactory == null) { - throw new AMQFrameDecodingException(null, "Unsupported frame type: " + type, null); + throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR, "Unsupported frame type: " + type); } final int channel = in.readUnsignedShort(); @@ -92,8 +97,8 @@ public class AMQDataBlockDecoder // bodySize can be zero if ((channel < 0) || (bodySize < 0)) { - throw new AMQFrameDecodingException(null, "Undecodable frame: type = " + type + " channel = " + channel - + " bodySize = " + bodySize, null); + throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR, "Undecodable frame: type = " + type + " channel = " + channel + + " bodySize = " + bodySize); } AMQFrame frame = new AMQFrame(in, channel, bodySize, bodyFactory); @@ -101,11 +106,15 @@ public class AMQDataBlockDecoder byte marker = in.readByte(); if ((marker & 0xFF) != 0xCE) { - throw new AMQFrameDecodingException(null, "End of frame marker not found. Read " + marker + " length=" + bodySize - + " type=" + type, null); + throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR, "End of frame marker not found. Read " + marker + " length=" + bodySize + + " type=" + type); } return frame; } + public void setMaxFrameSize(final long maxFrameSize) + { + _maxFrameSize = maxFrameSize; + } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolHeaderException.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolHeaderException.java index b0c92d9aab..b55a48067d 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolHeaderException.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolHeaderException.java @@ -20,17 +20,12 @@ */ package org.apache.qpid.framing; -import org.apache.qpid.AMQException; +import org.apache.qpid.transport.TransportException; -/** - * AMQProtocolHeaderException indicates a format error in an AMQP frame header. - * <p> - * TODO Not an AMQP exception as no status code. - */ -public class AMQProtocolHeaderException extends AMQException +public class AMQProtocolHeaderException extends TransportException { public AMQProtocolHeaderException(String message, Throwable cause) { - super(null, message, cause); + super(message, cause); } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java index d48cd1754c..1866e1fd15 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java @@ -20,20 +20,21 @@ */ package org.apache.qpid.transport; -import org.apache.qpid.common.QpidProperties; -import org.apache.qpid.configuration.ClientProperties; -import org.apache.qpid.properties.ConnectionStartProperties; -import org.apache.qpid.transport.util.Logger; - import static org.apache.qpid.transport.Connection.State.OPEN; import static org.apache.qpid.transport.Connection.State.RESUMING; -import javax.security.sasl.SaslClient; -import javax.security.sasl.SaslException; import java.util.HashMap; import java.util.List; import java.util.Map; +import javax.security.sasl.SaslClient; +import javax.security.sasl.SaslException; + +import org.apache.qpid.common.QpidProperties; +import org.apache.qpid.configuration.ClientProperties; +import org.apache.qpid.properties.ConnectionStartProperties; +import org.apache.qpid.transport.util.Logger; + /** * ClientDelegate @@ -138,13 +139,24 @@ public class ClientDelegate extends ConnectionDelegate int actualHeartbeatInterval = calculateHeartbeatInterval(heartbeatInterval, tune.getHeartbeatMin(), tune.getHeartbeatMax()); + int maxFrameSize = tune.getMaxFrameSize(); + int settingsMaxFrameSize = conn.getConnectionSettings().getMaxFrameSize(); + if(maxFrameSize == 0 && settingsMaxFrameSize != 0 && settingsMaxFrameSize < 0xffff) + { + maxFrameSize = Math.max(Constant.MIN_MAX_FRAME_SIZE, settingsMaxFrameSize); + } + else if(maxFrameSize != 0 && settingsMaxFrameSize != 0) + { + maxFrameSize = Math.max(Constant.MIN_MAX_FRAME_SIZE, Math.min(maxFrameSize, settingsMaxFrameSize)); + } conn.connectionTuneOk(tune.getChannelMax(), - tune.getMaxFrameSize(), + maxFrameSize, actualHeartbeatInterval); int idleTimeout = (int)(actualHeartbeatInterval * 1000 * heartbeatTimeoutFactor); conn.getNetworkConnection().setMaxReadIdle((int)(actualHeartbeatInterval*heartbeatTimeoutFactor)); conn.getNetworkConnection().setMaxWriteIdle(actualHeartbeatInterval); + conn.setMaxFrameSize(maxFrameSize == 0 ? 0xffff : maxFrameSize); conn.setIdleTimeout(idleTimeout); int channelMax = tune.getChannelMax(); @@ -183,7 +195,7 @@ public class ClientDelegate extends ConnectionDelegate /** * Currently the spec specified the min and max for heartbeat using secs */ - private int calculateHeartbeatInterval(int heartbeat,int min, int max) + int calculateHeartbeatInterval(int heartbeat,int min, int max) { int i = heartbeat; if (i == 0) diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java index 7c604e8e8e..44cb30e735 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java @@ -20,23 +20,12 @@ */ package org.apache.qpid.transport; -import org.apache.qpid.framing.ProtocolVersion; -import org.apache.qpid.transport.network.*; -import org.apache.qpid.transport.network.security.SecurityLayer; -import org.apache.qpid.transport.network.security.SecurityLayerFactory; -import org.apache.qpid.transport.util.Logger; -import org.apache.qpid.transport.util.Waiter; -import org.apache.qpid.util.Strings; - import static org.apache.qpid.transport.Connection.State.CLOSED; import static org.apache.qpid.transport.Connection.State.CLOSING; import static org.apache.qpid.transport.Connection.State.NEW; import static org.apache.qpid.transport.Connection.State.OPEN; import static org.apache.qpid.transport.Connection.State.OPENING; -import javax.security.sasl.SaslClient; -import javax.security.sasl.SaslServer; - import java.net.SocketAddress; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -48,6 +37,23 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; +import javax.security.sasl.SaslClient; +import javax.security.sasl.SaslServer; + +import org.apache.qpid.framing.ProtocolVersion; +import org.apache.qpid.transport.network.Assembler; +import org.apache.qpid.transport.network.Disassembler; +import org.apache.qpid.transport.network.InputHandler; +import org.apache.qpid.transport.network.NetworkConnection; +import org.apache.qpid.transport.network.OutgoingNetworkTransport; +import org.apache.qpid.transport.network.Transport; +import org.apache.qpid.transport.network.TransportActivity; +import org.apache.qpid.transport.network.security.SecurityLayer; +import org.apache.qpid.transport.network.security.SecurityLayerFactory; +import org.apache.qpid.transport.util.Logger; +import org.apache.qpid.transport.util.Waiter; +import org.apache.qpid.util.Strings; + /** * Connection @@ -71,7 +77,7 @@ public class Connection extends ConnectionInvoker private long _lastSendTime; private long _lastReadTime; private NetworkConnection _networkConnection; - + private FrameSizeObserver _frameSizeObserver; public enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD, RESUMING } @@ -224,7 +230,9 @@ public class Connection extends ConnectionInvoker securityLayer = SecurityLayerFactory.newInstance(getConnectionSettings()); OutgoingNetworkTransport transport = Transport.getOutgoingTransportInstance(ProtocolVersion.v0_10); - Receiver<ByteBuffer> secureReceiver = securityLayer.receiver(new InputHandler(new Assembler(this))); + final InputHandler inputHandler = new InputHandler(new Assembler(this)); + addFrameSizeObserver(inputHandler); + Receiver<ByteBuffer> secureReceiver = securityLayer.receiver(inputHandler); if(secureReceiver instanceof ConnectionListener) { addConnectionListener((ConnectionListener)secureReceiver); @@ -241,7 +249,9 @@ public class Connection extends ConnectionInvoker { addConnectionListener((ConnectionListener)secureSender); } - sender = new Disassembler(secureSender, settings.getMaxFrameSize()); + Disassembler disassembler = new Disassembler(secureSender, Constant.MIN_MAX_FRAME_SIZE); + sender = disassembler; + addFrameSizeObserver(disassembler); send(new ProtocolHeader(1, 0, 10)); @@ -809,4 +819,33 @@ public class Connection extends ConnectionInvoker { return _networkConnection; } + + public void setMaxFrameSize(final int maxFrameSize) + { + if(_frameSizeObserver != null) + { + _frameSizeObserver.setMaxFrameSize(maxFrameSize); + } + } + + public void addFrameSizeObserver(final FrameSizeObserver frameSizeObserver) + { + if(_frameSizeObserver == null) + { + _frameSizeObserver = frameSizeObserver; + } + else + { + final FrameSizeObserver currentObserver = _frameSizeObserver; + _frameSizeObserver = new FrameSizeObserver() + { + @Override + public void setMaxFrameSize(final int frameSize) + { + currentObserver.setMaxFrameSize(frameSize); + frameSizeObserver.setMaxFrameSize(frameSize); + } + }; + } + } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/FrameSizeObserver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/FrameSizeObserver.java new file mode 100644 index 0000000000..94d0080fbb --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/FrameSizeObserver.java @@ -0,0 +1,26 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport; + +public interface FrameSizeObserver +{ + void setMaxFrameSize(int frameSize); +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java index 1e0d5b9698..82a677b8f7 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java @@ -20,18 +20,19 @@ */ package org.apache.qpid.transport; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import static org.apache.qpid.transport.Connection.State.OPEN; -import javax.security.sasl.Sasl; -import javax.security.sasl.SaslException; -import javax.security.sasl.SaslServer; import java.util.Collections; import java.util.List; import java.util.Map; +import javax.security.sasl.Sasl; +import javax.security.sasl.SaslException; +import javax.security.sasl.SaslServer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * ServerDelegate */ @@ -136,12 +137,14 @@ public class ServerDelegate extends ConnectionDelegate protected void tuneAuthorizedConnection(final Connection conn) { - conn.connectionTune - (getChannelMax(), - org.apache.qpid.transport.network.ConnectionBinding.MAX_FRAME_SIZE, - 0, getHeartbeatMax()); + conn.connectionTune(getChannelMax(), getFrameMax(), 0, getHeartbeatMax()); } - + + protected int getFrameMax() + { + return org.apache.qpid.transport.network.ConnectionBinding.MAX_FRAME_SIZE; + } + protected void secure(final Connection conn, final byte[] response) { final SaslServer ss = conn.getSaslServer(); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java index 5a5de597c2..26e8f1850b 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java @@ -20,17 +20,18 @@ */ package org.apache.qpid.transport.network; +import java.nio.ByteBuffer; + import org.apache.qpid.transport.Binding; import org.apache.qpid.transport.Connection; import org.apache.qpid.transport.ConnectionDelegate; import org.apache.qpid.transport.ConnectionListener; +import org.apache.qpid.transport.Constant; import org.apache.qpid.transport.Receiver; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.network.security.sasl.SASLReceiver; import org.apache.qpid.transport.network.security.sasl.SASLSender; -import java.nio.ByteBuffer; - /** * ConnectionBinding * @@ -80,23 +81,26 @@ public abstract class ConnectionBinding } // XXX: hardcoded max-frame - Disassembler dis = new Disassembler(sender, MAX_FRAME_SIZE); + Disassembler dis = new Disassembler(sender, Constant.MIN_MAX_FRAME_SIZE); + conn.addFrameSizeObserver(dis); conn.setSender(dis); return conn; } public Receiver<ByteBuffer> receiver(Connection conn) { - if (conn.getConnectionSettings() != null && + final InputHandler inputHandler = new InputHandler(new Assembler(conn)); + conn.addFrameSizeObserver(inputHandler); + if (conn.getConnectionSettings() != null && conn.getConnectionSettings().isUseSASLEncryption()) { - SASLReceiver receiver = new SASLReceiver(new InputHandler(new Assembler(conn))); - conn.addConnectionListener((ConnectionListener)receiver); + SASLReceiver receiver = new SASLReceiver(inputHandler); + conn.addConnectionListener(receiver); return receiver; } else { - return new InputHandler(new Assembler(conn)); + return inputHandler; } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java index fe437ecf93..a804cb2f9d 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java @@ -20,6 +20,17 @@ */ package org.apache.qpid.transport.network; +import static java.lang.Math.min; +import static org.apache.qpid.transport.network.Frame.FIRST_FRAME; +import static org.apache.qpid.transport.network.Frame.FIRST_SEG; +import static org.apache.qpid.transport.network.Frame.HEADER_SIZE; +import static org.apache.qpid.transport.network.Frame.LAST_FRAME; +import static org.apache.qpid.transport.network.Frame.LAST_SEG; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +import org.apache.qpid.transport.FrameSizeObserver; import org.apache.qpid.transport.Header; import org.apache.qpid.transport.Method; import org.apache.qpid.transport.ProtocolDelegate; @@ -31,24 +42,13 @@ import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.Struct; import org.apache.qpid.transport.codec.BBEncoder; -import static org.apache.qpid.transport.network.Frame.FIRST_FRAME; -import static org.apache.qpid.transport.network.Frame.FIRST_SEG; -import static org.apache.qpid.transport.network.Frame.HEADER_SIZE; -import static org.apache.qpid.transport.network.Frame.LAST_FRAME; -import static org.apache.qpid.transport.network.Frame.LAST_SEG; - -import java.nio.ByteBuffer; -import java.nio.ByteOrder; - -import static java.lang.Math.min; - /** * Disassembler */ -public final class Disassembler implements Sender<ProtocolEvent>, ProtocolDelegate<Void> +public final class Disassembler implements Sender<ProtocolEvent>, ProtocolDelegate<Void>, FrameSizeObserver { private final Sender<ByteBuffer> sender; - private final int maxPayload; + private int maxPayload; private final Object sendlock = new Object(); private final static ThreadLocal<BBEncoder> _encoder = new ThreadLocal<BBEncoder>() { @@ -60,11 +60,11 @@ public final class Disassembler implements Sender<ProtocolEvent>, ProtocolDelega public Disassembler(Sender<ByteBuffer> sender, int maxFrame) { + this.sender = sender; if (maxFrame <= HEADER_SIZE || maxFrame >= 64*1024) { throw new IllegalArgumentException("maxFrame must be > HEADER_SIZE and < 64K: " + maxFrame); } - this.sender = sender; this.maxPayload = maxFrame - HEADER_SIZE; } @@ -255,4 +255,15 @@ public final class Disassembler implements Sender<ProtocolEvent>, ProtocolDelega { sender.setIdleTimeout(i); } + + @Override + public void setMaxFrameSize(final int maxFrame) + { + if (maxFrame <= HEADER_SIZE || maxFrame >= 64*1024) + { + throw new IllegalArgumentException("maxFrame must be > HEADER_SIZE and < 64K: " + maxFrame); + } + this.maxPayload = maxFrame - HEADER_SIZE; + + } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Frame.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Frame.java index 9416c4c9fa..e810d9e8ae 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Frame.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Frame.java @@ -20,12 +20,12 @@ */ package org.apache.qpid.transport.network; -import org.apache.qpid.transport.SegmentType; - import static org.apache.qpid.transport.util.Functions.str; import java.nio.ByteBuffer; +import org.apache.qpid.transport.SegmentType; + /** * Frame diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java index 86e05db818..758c2e1eda 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java @@ -20,11 +20,6 @@ */ package org.apache.qpid.transport.network; -import org.apache.qpid.transport.ProtocolError; -import org.apache.qpid.transport.ProtocolHeader; -import org.apache.qpid.transport.Receiver; -import org.apache.qpid.transport.SegmentType; - import static org.apache.qpid.transport.network.InputHandler.State.ERROR; import static org.apache.qpid.transport.network.InputHandler.State.FRAME_BODY; import static org.apache.qpid.transport.network.InputHandler.State.FRAME_HDR; @@ -34,6 +29,13 @@ import static org.apache.qpid.transport.util.Functions.str; import java.nio.ByteBuffer; import java.nio.ByteOrder; +import org.apache.qpid.transport.Constant; +import org.apache.qpid.transport.FrameSizeObserver; +import org.apache.qpid.transport.ProtocolError; +import org.apache.qpid.transport.ProtocolHeader; +import org.apache.qpid.transport.Receiver; +import org.apache.qpid.transport.SegmentType; + /** * InputHandler @@ -41,15 +43,17 @@ import java.nio.ByteOrder; * @author Rafael H. Schloming */ -public class InputHandler implements Receiver<ByteBuffer> +public class InputHandler implements Receiver<ByteBuffer>, FrameSizeObserver { + private int _maxFrameSize = Constant.MIN_MAX_FRAME_SIZE; + public enum State { PROTO_HDR, FRAME_HDR, FRAME_BODY, - ERROR; + ERROR } private final Receiver<NetworkEvent> receiver; @@ -83,6 +87,11 @@ public class InputHandler implements Receiver<ByteBuffer> this(receiver, PROTO_HDR); } + public void setMaxFrameSize(final int maxFrameSize) + { + _maxFrameSize = maxFrameSize; + } + private void error(String fmt, Object ... args) { receiver.received(new ProtocolError(Frame.L1, fmt, args)); @@ -158,7 +167,8 @@ public class InputHandler implements Receiver<ByteBuffer> type = SegmentType.get(input.get(pos + 1)); int size = (0xFFFF & input.getShort(pos + 2)); size -= Frame.HEADER_SIZE; - if (size < 0 || size > (64*1024 - 12)) + _maxFrameSize = 64 * 1024; + if (size < 0 || size > (_maxFrameSize - 12)) { error("bad frame size: %d", size); return ERROR; diff --git a/qpid/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java b/qpid/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java index cb820b333b..cd810f6b3d 100644 --- a/qpid/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java +++ b/qpid/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java @@ -21,6 +21,12 @@ package org.apache.qpid.codec; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; + import junit.framework.TestCase; import org.apache.qpid.framing.AMQDataBlock; @@ -29,23 +35,15 @@ import org.apache.qpid.framing.AMQFrameDecodingException; import org.apache.qpid.framing.AMQProtocolVersionException; import org.apache.qpid.framing.HeartbeatBody; -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; - public class AMQDecoderTest extends TestCase { - private AMQCodecFactory _factory; private AMQDecoder _decoder; public void setUp() { - _factory = new AMQCodecFactory(false, null); - _decoder = _factory.getDecoder(); + _decoder = new AMQDecoder(false, null); } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java index de2b594211..c771e84f52 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.server.protocol; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.net.InetSocketAddress; @@ -32,6 +34,7 @@ import java.util.Set; import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Protocol; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.server.virtualhost.VirtualHostImpl; @@ -52,6 +55,7 @@ public class MultiVersionProtocolEngineFactoryTest extends QpidTestCase _broker = BrokerTestHelper.createBrokerMock(); when(_broker.getAttribute(Broker.DEFAULT_VIRTUAL_HOST)).thenReturn("default"); when(_broker.getDefaultVirtualHost()).thenReturn("default"); + when(_broker.getContextValue(eq(Long.class), eq(Broker.BROKER_FRAME_SIZE))).thenReturn(0xffffl); } @@ -149,8 +153,10 @@ public class MultiVersionProtocolEngineFactoryTest extends QpidTestCase { Set<Protocol> protocols = getAllAMQPProtocols(); + Port<?> port = mock(Port.class); + when(port.getContextValue(eq(Long.class),eq(Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY))).thenReturn(10000l); MultiVersionProtocolEngineFactory factory = - new MultiVersionProtocolEngineFactory(_broker, null, false, false, protocols, null, null, + new MultiVersionProtocolEngineFactory(_broker, null, false, false, protocols, null, port, org.apache.qpid.server.model.Transport.TCP); //create a dummy to retrieve the 'current' ID number diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/transport/MaxFrameSizeTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/transport/MaxFrameSizeTest.java new file mode 100644 index 0000000000..322b971487 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/transport/MaxFrameSizeTest.java @@ -0,0 +1,349 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.Socket; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import javax.naming.NamingException; +import javax.security.auth.callback.Callback; +import javax.security.auth.callback.CallbackHandler; +import javax.security.auth.callback.NameCallback; +import javax.security.auth.callback.PasswordCallback; +import javax.security.auth.callback.UnsupportedCallbackException; +import javax.security.sasl.Sasl; +import javax.security.sasl.SaslClient; +import javax.security.sasl.SaslException; + +import org.apache.qpid.codec.MarkableDataInput; +import org.apache.qpid.framing.AMQBody; +import org.apache.qpid.framing.AMQDataBlockDecoder; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQFrameDecodingException; +import org.apache.qpid.framing.AMQProtocolVersionException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BodyFactory; +import org.apache.qpid.framing.ByteArrayDataInput; +import org.apache.qpid.framing.ConnectionCloseBody; +import org.apache.qpid.framing.ConnectionStartOkBody; +import org.apache.qpid.framing.ConnectionTuneOkBody; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.amqp_0_91.ConnectionStartOkBodyImpl; +import org.apache.qpid.framing.amqp_0_91.ConnectionTuneOkBodyImpl; +import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91; +import org.apache.qpid.jms.BrokerDetails; +import org.apache.qpid.server.model.AuthenticationProvider; +import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.Protocol; +import org.apache.qpid.server.protocol.v0_8.ProtocolEngineCreator_0_8; +import org.apache.qpid.server.protocol.v0_8.ProtocolEngineCreator_0_9; +import org.apache.qpid.server.protocol.v0_8.ProtocolEngineCreator_0_9_1; +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.test.utils.TestBrokerConfiguration; + +public class MaxFrameSizeTest extends QpidBrokerTestCase +{ + + @Override + public void setUp() throws Exception + { + // don't call super.setup() as we want a change to set stuff up before the broker starts + // super.setUp(); + } + + public void testTooSmallFrameSize() throws Exception + { + + getBrokerConfiguration().setObjectAttribute(AuthenticationProvider.class, + TestBrokerConfiguration.ENTRY_NAME_AUTHENTICATION_PROVIDER, + "secureOnlyMechanisms", + "[]"); + super.setUp(); + + if(isBroker010()) + { + Connection conn = new Connection(); + final ConnectionSettings settings = new ConnectionSettings(); + BrokerDetails brokerDetails = getConnectionFactory().getConnectionURL().getAllBrokerDetails().get(0); + settings.setHost(brokerDetails.getHost()); + settings.setPort(brokerDetails.getPort()); + settings.setUsername(GUEST_USERNAME); + settings.setPassword(GUEST_PASSWORD); + final ConnectionDelegate clientDelegate = new TestClientDelegate(settings, 1024); + conn.setConnectionDelegate(clientDelegate); + try + { + conn.connect(settings); + fail("Connection should not be possible with a frame size < " + Constant.MIN_MAX_FRAME_SIZE); + } + catch(ConnectionException e) + { + // pass + } + + } + else + { + doAMQP08test(1024, new ResultEvaluator() + { + + @Override + public void evaluate(final Socket socket, final List<AMQFrame> frames) + { + if(!socket.isClosed()) + { + AMQFrame lastFrame = frames.get(frames.size() - 1); + assertTrue("Connection should not be possible with a frame size < " + Constant.MIN_MAX_FRAME_SIZE, lastFrame.getBodyFrame() instanceof ConnectionCloseBody); + } + } + }); + } + } + + + public void testTooLargeFrameSize() throws Exception + { + getBrokerConfiguration().setObjectAttribute(AuthenticationProvider.class, + TestBrokerConfiguration.ENTRY_NAME_AUTHENTICATION_PROVIDER, + "secureOnlyMechanisms", + "[]"); + setTestSystemProperty(Broker.BROKER_FRAME_SIZE, "8192"); + super.setUp(); + if(isBroker010()) + { + Connection conn = new Connection(); + final ConnectionSettings settings = new ConnectionSettings(); + BrokerDetails brokerDetails = getConnectionFactory().getConnectionURL().getAllBrokerDetails().get(0); + settings.setHost(brokerDetails.getHost()); + settings.setPort(brokerDetails.getPort()); + settings.setUsername(GUEST_USERNAME); + settings.setPassword(GUEST_PASSWORD); + final ConnectionDelegate clientDelegate = new TestClientDelegate(settings, 0xffff); + conn.setConnectionDelegate(clientDelegate); + try + { + conn.connect(settings); + fail("Connection should not be possible with a frame size larger than the broker requested"); + } + catch(ConnectionException e) + { + // pass + } + + } + else + { + doAMQP08test(10000, new ResultEvaluator() + { + + @Override + public void evaluate(final Socket socket, final List<AMQFrame> frames) + { + if(!socket.isClosed()) + { + AMQFrame lastFrame = frames.get(frames.size() - 1); + assertTrue("Connection should not be possible with a frame size larger than the broker requested", lastFrame.getBodyFrame() instanceof ConnectionCloseBody); + } + } + }); + } + } + + private static interface ResultEvaluator + { + void evaluate(Socket socket, List<AMQFrame> frames); + } + + private void doAMQP08test(int frameSize, ResultEvaluator evaluator) + throws NamingException, IOException, AMQFrameDecodingException, AMQProtocolVersionException + { + BrokerDetails brokerDetails = getConnectionFactory().getConnectionURL().getAllBrokerDetails().get(0); + + Socket socket = new Socket(brokerDetails.getHost(), brokerDetails.getPort()); + socket.setTcpNoDelay(true); + OutputStream os = socket.getOutputStream(); + + byte[] protocolHeader; + Protocol protocol = getBrokerProtocol(); + switch(protocol) + { + case AMQP_0_8: + protocolHeader = (ProtocolEngineCreator_0_8.getInstance().getHeaderIdentifier()); + break; + case AMQP_0_9: + protocolHeader = (ProtocolEngineCreator_0_9.getInstance().getHeaderIdentifier()); + break; + case AMQP_0_9_1: + protocolHeader = (ProtocolEngineCreator_0_9_1.getInstance().getHeaderIdentifier()); + break; + default: + throw new RuntimeException("Unexpected Protocol Version: " + protocol); + } + os.write(protocolHeader); + InputStream is = socket.getInputStream(); + + final byte[] response = new byte[2+GUEST_USERNAME.length()+GUEST_PASSWORD.length()]; + int i = 1; + for(byte b : GUEST_USERNAME.getBytes(StandardCharsets.US_ASCII)) + { + response[i++] = b; + } + i++; + for(byte b : GUEST_PASSWORD.getBytes(StandardCharsets.US_ASCII)) + { + response[i++] = b; + } + + ConnectionStartOkBody startOK = new ConnectionStartOkBodyImpl(new FieldTable(), AMQShortString.valueOf("PLAIN"), response, AMQShortString.valueOf("en_US")); + + DataOutputStream dos = new DataOutputStream(os); + new AMQFrame(0, startOK).writePayload(dos); + dos.flush(); + ConnectionTuneOkBody tuneOk = new ConnectionTuneOkBodyImpl(256, frameSize, 0); + new AMQFrame(0, tuneOk).writePayload(dos); + dos.flush(); + socket.setSoTimeout(5000); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + byte[] buffer = new byte[1024]; + int size; + while((size = is.read(buffer)) > 0) + { + baos.write(buffer,0,size); + } + + byte[] serverData = baos.toByteArray(); + ByteArrayDataInput badi = new ByteArrayDataInput(serverData); + AMQDataBlockDecoder datablockDecoder = new AMQDataBlockDecoder(); + final MethodRegistry_0_91 methodRegistry_0_91 = new MethodRegistry_0_91(); + BodyFactory methodBodyFactory = new BodyFactory() + { + @Override + public AMQBody createBody(final MarkableDataInput in, final long bodySize) + throws AMQFrameDecodingException, IOException + { + return methodRegistry_0_91.convertToBody(in, bodySize); + } + }; + + List<AMQFrame> frames = new ArrayList<>(); + while (datablockDecoder.decodable(badi)) + { + frames.add(datablockDecoder.createAndPopulateFrame(methodBodyFactory, badi)); + } + + evaluator.evaluate(socket, frames); + } + + private static class TestClientDelegate extends ClientDelegate + { + + private final int _maxFrameSize; + + public TestClientDelegate(final ConnectionSettings settings, final int maxFrameSize) + { + super(settings); + _maxFrameSize = maxFrameSize; + } + + @Override + protected SaslClient createSaslClient(final List<Object> brokerMechs) throws ConnectionException, SaslException + { + final CallbackHandler handler = new CallbackHandler() + { + @Override + public void handle(final Callback[] callbacks) throws IOException, UnsupportedCallbackException + { + for (int i = 0; i < callbacks.length; i++) + { + Callback cb = callbacks[i]; + if (cb instanceof NameCallback) + { + ((NameCallback)cb).setName(GUEST_USERNAME); + } + else if (cb instanceof PasswordCallback) + { + ((PasswordCallback)cb).setPassword(GUEST_PASSWORD.toCharArray()); + } + else + { + throw new UnsupportedCallbackException(cb); + } + } + + } + }; + String[] selectedMechs = {}; + for(String mech : new String[] { "ANONYMOUS", "PLAIN", "CRAM-MD5", "SCRAM-SHA-1", "SCRAM-SHA-256"}) + { + if(brokerMechs.contains(mech)) + { + selectedMechs = new String[] {mech}; + break; + } + } + + + return Sasl.createSaslClient(selectedMechs, + null, + getConnectionSettings().getSaslProtocol(), + getConnectionSettings().getSaslServerName(), + Collections.<String,Object>emptyMap(), + handler); + + } + + @Override + public void connectionTune(Connection conn, ConnectionTune tune) + { + int heartbeatInterval = getConnectionSettings().getHeartbeatInterval010(); + float heartbeatTimeoutFactor = getConnectionSettings().getHeartbeatTimeoutFactor(); + int actualHeartbeatInterval = calculateHeartbeatInterval(heartbeatInterval, + tune.getHeartbeatMin(), + tune.getHeartbeatMax()); + + conn.connectionTuneOk(tune.getChannelMax(), + _maxFrameSize, + actualHeartbeatInterval); + + int idleTimeout = (int)(actualHeartbeatInterval * 1000 * heartbeatTimeoutFactor); + conn.getNetworkConnection().setMaxReadIdle((int)(actualHeartbeatInterval*heartbeatTimeoutFactor)); + conn.getNetworkConnection().setMaxWriteIdle(actualHeartbeatInterval); + conn.setMaxFrameSize(_maxFrameSize); + + + conn.setIdleTimeout(idleTimeout); + + int channelMax = tune.getChannelMax(); + conn.setChannelMax(channelMax == 0 ? Connection.MAX_CHANNEL_MAX : channelMax); + + conn.connectionOpen(getConnectionSettings().getVhost(), null, Option.INSIST); + } + + } +} |