summaryrefslogtreecommitdiff
path: root/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java')
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java60
1 files changed, 43 insertions, 17 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
index 46a9430814..1c264e52c6 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
@@ -44,37 +44,38 @@ import javax.security.auth.Subject;
import javax.security.sasl.SaslServer;
import org.apache.log4j.Logger;
+
import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.*;
-import org.apache.qpid.codec.AMQCodecFactory;
+import org.apache.qpid.codec.AMQDecoder;
import org.apache.qpid.common.QpidProperties;
import org.apache.qpid.common.ServerPropertyNames;
+import org.apache.qpid.framing.*;
import org.apache.qpid.properties.ConnectionStartProperties;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.connection.ConnectionPrincipal;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.logging.EventLogger;
-import org.apache.qpid.server.message.InstanceProperties;
-import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.configuration.BrokerProperties;
-import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.protocol.SessionModelListener;
-import org.apache.qpid.server.protocol.v0_8.handler.ServerMethodDispatcherImpl;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.logging.subjects.ConnectionLogSubject;
+import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Transport;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.SessionModelListener;
+import org.apache.qpid.server.protocol.v0_8.handler.ServerMethodDispatcherImpl;
import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverterRegistry;
-import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.protocol.v0_8.state.AMQState;
import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
+import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
@@ -93,7 +94,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
// channels. This value must be of the form 2^x - 1.
private static final int CHANNEL_CACHE_SIZE = 0xff;
private static final int REUSABLE_BYTE_BUFFER_CAPACITY = 65 * 1024;
- private final Port _port;
+ private final Port<?> _port;
+ private final long _creationTime;
private AMQShortString _contextKey;
@@ -123,7 +125,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
private final AMQStateManager _stateManager;
- private AMQCodecFactory _codecFactory;
+ private AMQDecoder _decoder;
private SaslServer _saslServer;
@@ -166,12 +168,13 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
private final ReentrantLock _receivedLock;
private AtomicLong _lastWriteTime = new AtomicLong(System.currentTimeMillis());
- private final Broker _broker;
+ private final Broker<?> _broker;
private final Transport _transport;
private volatile boolean _closeWhenNoRoute;
private volatile boolean _stopped;
private long _readBytes;
+ private boolean _authenticated;
public AMQProtocolEngine(Broker broker,
final NetworkConnection network,
@@ -185,7 +188,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
_maxNoOfChannels = broker.getConnection_sessionCountLimit();
_receivedLock = new ReentrantLock();
_stateManager = new AMQStateManager(broker, this);
- _codecFactory = new AMQCodecFactory(true, this);
+ _decoder = new AMQDecoder(true, this);
_connectionID = connectionId;
_logSubject = new ConnectionLogSubject(this);
@@ -210,6 +213,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
_dataDelivered = new StatisticsCounter("data-delivered-" + getSessionID());
_messagesReceived = new StatisticsCounter("messages-received-" + getSessionID());
_dataReceived = new StatisticsCounter("data-received-" + getSessionID());
+ _creationTime = System.currentTimeMillis();
}
private <T> T runAsSubject(PrivilegedAction<T> action)
@@ -247,6 +251,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
public void setMaxFrameSize(long frameMax)
{
_maxFrameSize = frameMax;
+ _decoder.setMaxFrameSize(frameMax);
}
public long getMaxFrameSize()
@@ -277,7 +282,18 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
@Override
public Void run()
{
+
final long arrivalTime = System.currentTimeMillis();
+ if(!_authenticated &&
+ (arrivalTime - _creationTime) > _port.getContextValue(Long.class,
+ Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY))
+ {
+ _logger.warn("Connection has taken more than "
+ + _port.getContextValue(Long.class, Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY)
+ + "ms to establish identity. Closing as possible DoS.");
+ getEventLogger().message(ConnectionMessages.IDLE_CLOSE());
+ closeProtocolSession();
+ }
_lastReceivedTime = arrivalTime;
_lastIoTime = arrivalTime;
_readBytes += msg.remaining();
@@ -285,7 +301,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
_receivedLock.lock();
try
{
- final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
+ final ArrayList<AMQDataBlock> dataBlocks = _decoder.decodeBuffer(msg);
for (AMQDataBlock dataBlock : dataBlocks)
{
try
@@ -479,7 +495,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
private synchronized void protocolInitiationReceived(ProtocolInitiation pi)
{
// this ensures the codec never checks for a PI message again
- (_codecFactory.getDecoder()).setExpectProtocolInitiation(false);
+ _decoder.setExpectProtocolInitiation(false);
try
{
// Log incoming protocol negotiation request
@@ -1200,6 +1216,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
throw new IllegalArgumentException("authorizedSubject cannot be null");
}
+ _authenticated = true;
_authorizedSubject.getPrincipals().addAll(authorizedSubject.getPrincipals());
_authorizedSubject.getPrivateCredentials().addAll(authorizedSubject.getPrivateCredentials());
_authorizedSubject.getPublicCredentials().addAll(authorizedSubject.getPublicCredentials());
@@ -1273,7 +1290,16 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
public void readerIdle()
{
- // TODO - enforce disconnect on lack of inbound data
+ Subject.doAs(_authorizedSubject, new PrivilegedAction<Object>()
+ {
+ @Override
+ public Object run()
+ {
+ getEventLogger().message(ConnectionMessages.IDLE_CLOSE());
+ _network.close();
+ return null;
+ }
+ });
}
public synchronized void writerIdle()
@@ -1344,7 +1370,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
}
@Override
- public Port getPort()
+ public Port<?> getPort()
{
return _port;
}