diff options
Diffstat (limited to 'qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java')
-rw-r--r-- | qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java | 60 |
1 files changed, 43 insertions, 17 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java index 46a9430814..1c264e52c6 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java @@ -44,37 +44,38 @@ import javax.security.auth.Subject; import javax.security.sasl.SaslServer; import org.apache.log4j.Logger; + import org.apache.qpid.AMQChannelException; import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.*; -import org.apache.qpid.codec.AMQCodecFactory; +import org.apache.qpid.codec.AMQDecoder; import org.apache.qpid.common.QpidProperties; import org.apache.qpid.common.ServerPropertyNames; +import org.apache.qpid.framing.*; import org.apache.qpid.properties.ConnectionStartProperties; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.connection.ConnectionPrincipal; import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.logging.EventLogger; -import org.apache.qpid.server.message.InstanceProperties; -import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.configuration.BrokerProperties; -import org.apache.qpid.server.protocol.AMQSessionModel; -import org.apache.qpid.server.protocol.SessionModelListener; -import org.apache.qpid.server.protocol.v0_8.handler.ServerMethodDispatcherImpl; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.messages.ConnectionMessages; import org.apache.qpid.server.logging.subjects.ConnectionLogSubject; +import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Transport; +import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.protocol.SessionModelListener; +import org.apache.qpid.server.protocol.v0_8.handler.ServerMethodDispatcherImpl; import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter; import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverterRegistry; -import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; import org.apache.qpid.server.protocol.v0_8.state.AMQState; import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; +import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; @@ -93,7 +94,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi // channels. This value must be of the form 2^x - 1. private static final int CHANNEL_CACHE_SIZE = 0xff; private static final int REUSABLE_BYTE_BUFFER_CAPACITY = 65 * 1024; - private final Port _port; + private final Port<?> _port; + private final long _creationTime; private AMQShortString _contextKey; @@ -123,7 +125,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi private final AMQStateManager _stateManager; - private AMQCodecFactory _codecFactory; + private AMQDecoder _decoder; private SaslServer _saslServer; @@ -166,12 +168,13 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi private final ReentrantLock _receivedLock; private AtomicLong _lastWriteTime = new AtomicLong(System.currentTimeMillis()); - private final Broker _broker; + private final Broker<?> _broker; private final Transport _transport; private volatile boolean _closeWhenNoRoute; private volatile boolean _stopped; private long _readBytes; + private boolean _authenticated; public AMQProtocolEngine(Broker broker, final NetworkConnection network, @@ -185,7 +188,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi _maxNoOfChannels = broker.getConnection_sessionCountLimit(); _receivedLock = new ReentrantLock(); _stateManager = new AMQStateManager(broker, this); - _codecFactory = new AMQCodecFactory(true, this); + _decoder = new AMQDecoder(true, this); _connectionID = connectionId; _logSubject = new ConnectionLogSubject(this); @@ -210,6 +213,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi _dataDelivered = new StatisticsCounter("data-delivered-" + getSessionID()); _messagesReceived = new StatisticsCounter("messages-received-" + getSessionID()); _dataReceived = new StatisticsCounter("data-received-" + getSessionID()); + _creationTime = System.currentTimeMillis(); } private <T> T runAsSubject(PrivilegedAction<T> action) @@ -247,6 +251,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi public void setMaxFrameSize(long frameMax) { _maxFrameSize = frameMax; + _decoder.setMaxFrameSize(frameMax); } public long getMaxFrameSize() @@ -277,7 +282,18 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi @Override public Void run() { + final long arrivalTime = System.currentTimeMillis(); + if(!_authenticated && + (arrivalTime - _creationTime) > _port.getContextValue(Long.class, + Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY)) + { + _logger.warn("Connection has taken more than " + + _port.getContextValue(Long.class, Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY) + + "ms to establish identity. Closing as possible DoS."); + getEventLogger().message(ConnectionMessages.IDLE_CLOSE()); + closeProtocolSession(); + } _lastReceivedTime = arrivalTime; _lastIoTime = arrivalTime; _readBytes += msg.remaining(); @@ -285,7 +301,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi _receivedLock.lock(); try { - final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg); + final ArrayList<AMQDataBlock> dataBlocks = _decoder.decodeBuffer(msg); for (AMQDataBlock dataBlock : dataBlocks) { try @@ -479,7 +495,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi private synchronized void protocolInitiationReceived(ProtocolInitiation pi) { // this ensures the codec never checks for a PI message again - (_codecFactory.getDecoder()).setExpectProtocolInitiation(false); + _decoder.setExpectProtocolInitiation(false); try { // Log incoming protocol negotiation request @@ -1200,6 +1216,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi throw new IllegalArgumentException("authorizedSubject cannot be null"); } + _authenticated = true; _authorizedSubject.getPrincipals().addAll(authorizedSubject.getPrincipals()); _authorizedSubject.getPrivateCredentials().addAll(authorizedSubject.getPrivateCredentials()); _authorizedSubject.getPublicCredentials().addAll(authorizedSubject.getPublicCredentials()); @@ -1273,7 +1290,16 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi public void readerIdle() { - // TODO - enforce disconnect on lack of inbound data + Subject.doAs(_authorizedSubject, new PrivilegedAction<Object>() + { + @Override + public Object run() + { + getEventLogger().message(ConnectionMessages.IDLE_CLOSE()); + _network.close(); + return null; + } + }); } public synchronized void writerIdle() @@ -1344,7 +1370,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi } @Override - public Port getPort() + public Port<?> getPort() { return _port; } |