diff options
Diffstat (limited to 'qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10')
3 files changed, 79 insertions, 30 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java index cefd1ee0b2..dc60a37a7f 100755 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java @@ -20,27 +20,32 @@ */ package org.apache.qpid.server.protocol.v0_10; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.security.AccessController; +import java.security.PrivilegedAction; + +import javax.security.auth.Subject; + +import org.apache.log4j.Logger; + import org.apache.qpid.protocol.ServerProtocolEngine; -import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.logging.messages.ConnectionMessages; import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Transport; +import org.apache.qpid.transport.Constant; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.network.Assembler; import org.apache.qpid.transport.network.Disassembler; import org.apache.qpid.transport.network.InputHandler; import org.apache.qpid.transport.network.NetworkConnection; -import javax.security.auth.Subject; -import java.net.SocketAddress; -import java.nio.ByteBuffer; -import java.security.AccessController; -import java.security.PrivilegedAction; - public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocolEngine { public static final int MAX_FRAME_SIZE = 64 * 1024 - 1; + private static final Logger _logger = Logger.getLogger(ProtocolEngine_0_10.class); + private NetworkConnection _network; private long _readBytes; @@ -87,7 +92,9 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol _network = network; _connection.setNetworkConnection(network); - _connection.setSender(new Disassembler(wrapSender(sender), MAX_FRAME_SIZE)); + Disassembler disassembler = new Disassembler(wrapSender(sender), Constant.MIN_MAX_FRAME_SIZE); + _connection.setSender(disassembler); + _connection.addFrameSizeObserver(disassembler); // FIXME Two log messages to maintain compatibility with earlier protocol versions _connection.getEventLogger().message(ConnectionMessages.OPEN(null, "0-10", null, null, false, true, false, false)); @@ -154,6 +161,26 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol public void received(final ByteBuffer buf) { _lastReadTime = System.currentTimeMillis(); + if(_connection.getAuthorizedPrincipal() == null && + (_lastReadTime - _createTime) > _connection.getPort().getContextValue(Long.class, + Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY) ) + { + Subject.doAs(_connection.getAuthorizedSubject(), new PrivilegedAction<Object>() + { + @Override + public Object run() + { + + _logger.warn("Connection has taken more than " + + _connection.getPort() + .getContextValue(Long.class, Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY) + + "ms to establish identity. Closing as possible DoS."); + _connection.getEventLogger().message(ConnectionMessages.IDLE_CLOSE()); + _network.close(); + return null; + } + }); + } super.received(buf); _connection.receivedComplete(); } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java index 2ad79ad980..8ddd04f51a 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java @@ -75,7 +75,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S private final long _connectionId; private final Object _reference = new Object(); private VirtualHostImpl _virtualHost; - private Port _port; + private Port<?> _port; private AtomicLong _lastIoTime = new AtomicLong(); private boolean _blocking; private Transport _transport; @@ -189,12 +189,12 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S } @Override - public Port getPort() + public Port<?> getPort() { return _port; } - public void setPort(Port port) + public void setPort(Port<?> port) { _port = port; } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java index 7751ff765d..bab2d802e8 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java @@ -50,21 +50,7 @@ import org.apache.qpid.server.security.SubjectCreator; import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus; import org.apache.qpid.server.security.auth.SubjectAuthenticationResult; import org.apache.qpid.server.virtualhost.VirtualHostImpl; -import org.apache.qpid.transport.Binary; -import org.apache.qpid.transport.Connection; -import org.apache.qpid.transport.ConnectionClose; -import org.apache.qpid.transport.ConnectionCloseCode; -import org.apache.qpid.transport.ConnectionOpen; -import org.apache.qpid.transport.ConnectionOpenOk; -import org.apache.qpid.transport.ConnectionStartOk; -import org.apache.qpid.transport.ConnectionTuneOk; -import org.apache.qpid.transport.ServerDelegate; -import org.apache.qpid.transport.Session; -import org.apache.qpid.transport.SessionAttach; -import org.apache.qpid.transport.SessionDelegate; -import org.apache.qpid.transport.SessionDetach; -import org.apache.qpid.transport.SessionDetachCode; -import org.apache.qpid.transport.SessionDetached; +import org.apache.qpid.transport.*; import org.apache.qpid.transport.network.NetworkConnection; public class ServerConnectionDelegate extends ServerDelegate @@ -76,15 +62,16 @@ public class ServerConnectionDelegate extends ServerDelegate private int _maxNoOfChannels; private Map<String,Object> _clientProperties; private final SubjectCreator _subjectCreator; + private int _maximumFrameSize; - public ServerConnectionDelegate(Broker broker, String localFQDN, SubjectCreator subjectCreator) + public ServerConnectionDelegate(Broker<?> broker, String localFQDN, SubjectCreator subjectCreator) { this(createConnectionProperties(broker), Collections.singletonList((Object)"en_US"), broker, localFQDN, subjectCreator); } private ServerConnectionDelegate(Map<String, Object> properties, List<Object> locales, - Broker broker, + Broker<?> broker, String localFQDN, SubjectCreator subjectCreator) { @@ -94,9 +81,10 @@ public class ServerConnectionDelegate extends ServerDelegate _localFQDN = localFQDN; _maxNoOfChannels = broker.getConnection_sessionCountLimit(); _subjectCreator = subjectCreator; + _maximumFrameSize = (int) Math.min(0xffffl, broker.getContextValue(Long.class, Broker.BROKER_FRAME_SIZE)); } - private static List<String> getFeatures(Broker broker) + private static List<String> getFeatures(Broker<?> broker) { String brokerDisabledFeatures = System.getProperty(BrokerProperties.PROPERTY_DISABLED_FEATURES); final List<String> features = new ArrayList<String>(); @@ -108,7 +96,7 @@ public class ServerConnectionDelegate extends ServerDelegate return Collections.unmodifiableList(features); } - private static Map<String, Object> createConnectionProperties(final Broker broker) + private static Map<String, Object> createConnectionProperties(final Broker<?> broker) { final Map<String,Object> map = new HashMap<String,Object>(); // Federation tag is used by the client to identify the broker instance @@ -234,6 +222,7 @@ public class ServerConnectionDelegate extends ServerDelegate { ServerConnection sconn = (ServerConnection) conn; int okChannelMax = ok.getChannelMax(); + int okMaxFrameSize = ok.getMaxFrameSize(); if (okChannelMax > getChannelMax()) { @@ -246,6 +235,31 @@ public class ServerConnectionDelegate extends ServerDelegate return; } + if(okMaxFrameSize > getFrameMax()) + { + LOGGER.error("Connection '" + sconn.getConnectionId() + "' being severed, " + + "client connectionTuneOk returned a frameMax (" + okMaxFrameSize + + ") above the server's offered limit (" + getFrameMax() +")"); + + //Due to the error we must forcefully close the connection without negotiation + sconn.getSender().close(); + return; + } + else if(okMaxFrameSize > 0 && okMaxFrameSize < Constant.MIN_MAX_FRAME_SIZE) + { + LOGGER.error("Connection '" + sconn.getConnectionId() + "' being severed, " + + "client connectionTuneOk returned a frameMax (" + okMaxFrameSize + + ") below the minimum permitted size (" + Constant.MIN_MAX_FRAME_SIZE +")"); + + //Due to the error we must forcefully close the connection without negotiation + sconn.getSender().close(); + return; + } + else if(okMaxFrameSize == 0) + { + okMaxFrameSize = getFrameMax(); + } + final NetworkConnection networkConnection = sconn.getNetworkConnection(); if(ok.hasHeartbeat()) { @@ -266,6 +280,8 @@ public class ServerConnectionDelegate extends ServerDelegate } setConnectionTuneOkChannelMax(sconn, okChannelMax); + + conn.setMaxFrameSize(okMaxFrameSize); } @Override @@ -279,6 +295,12 @@ public class ServerConnectionDelegate extends ServerDelegate _maxNoOfChannels = channelMax; } + @Override + protected int getFrameMax() + { + return _maximumFrameSize; + } + @Override public void sessionDetach(Connection conn, SessionDetach dtc) { // To ensure a clean detach, we stop any remaining subscriptions. Stop ensures |