diff options
Diffstat (limited to 'qpid/java/broker-plugins/amqp-0-8-protocol')
5 files changed, 75 insertions, 27 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java index 46a9430814..1c264e52c6 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java @@ -44,37 +44,38 @@ import javax.security.auth.Subject; import javax.security.sasl.SaslServer; import org.apache.log4j.Logger; + import org.apache.qpid.AMQChannelException; import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.*; -import org.apache.qpid.codec.AMQCodecFactory; +import org.apache.qpid.codec.AMQDecoder; import org.apache.qpid.common.QpidProperties; import org.apache.qpid.common.ServerPropertyNames; +import org.apache.qpid.framing.*; import org.apache.qpid.properties.ConnectionStartProperties; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.connection.ConnectionPrincipal; import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.logging.EventLogger; -import org.apache.qpid.server.message.InstanceProperties; -import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.configuration.BrokerProperties; -import org.apache.qpid.server.protocol.AMQSessionModel; -import org.apache.qpid.server.protocol.SessionModelListener; -import org.apache.qpid.server.protocol.v0_8.handler.ServerMethodDispatcherImpl; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.messages.ConnectionMessages; import org.apache.qpid.server.logging.subjects.ConnectionLogSubject; +import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Transport; +import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.protocol.SessionModelListener; +import org.apache.qpid.server.protocol.v0_8.handler.ServerMethodDispatcherImpl; import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter; import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverterRegistry; -import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; import org.apache.qpid.server.protocol.v0_8.state.AMQState; import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; +import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; @@ -93,7 +94,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi // channels. This value must be of the form 2^x - 1. private static final int CHANNEL_CACHE_SIZE = 0xff; private static final int REUSABLE_BYTE_BUFFER_CAPACITY = 65 * 1024; - private final Port _port; + private final Port<?> _port; + private final long _creationTime; private AMQShortString _contextKey; @@ -123,7 +125,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi private final AMQStateManager _stateManager; - private AMQCodecFactory _codecFactory; + private AMQDecoder _decoder; private SaslServer _saslServer; @@ -166,12 +168,13 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi private final ReentrantLock _receivedLock; private AtomicLong _lastWriteTime = new AtomicLong(System.currentTimeMillis()); - private final Broker _broker; + private final Broker<?> _broker; private final Transport _transport; private volatile boolean _closeWhenNoRoute; private volatile boolean _stopped; private long _readBytes; + private boolean _authenticated; public AMQProtocolEngine(Broker broker, final NetworkConnection network, @@ -185,7 +188,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi _maxNoOfChannels = broker.getConnection_sessionCountLimit(); _receivedLock = new ReentrantLock(); _stateManager = new AMQStateManager(broker, this); - _codecFactory = new AMQCodecFactory(true, this); + _decoder = new AMQDecoder(true, this); _connectionID = connectionId; _logSubject = new ConnectionLogSubject(this); @@ -210,6 +213,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi _dataDelivered = new StatisticsCounter("data-delivered-" + getSessionID()); _messagesReceived = new StatisticsCounter("messages-received-" + getSessionID()); _dataReceived = new StatisticsCounter("data-received-" + getSessionID()); + _creationTime = System.currentTimeMillis(); } private <T> T runAsSubject(PrivilegedAction<T> action) @@ -247,6 +251,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi public void setMaxFrameSize(long frameMax) { _maxFrameSize = frameMax; + _decoder.setMaxFrameSize(frameMax); } public long getMaxFrameSize() @@ -277,7 +282,18 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi @Override public Void run() { + final long arrivalTime = System.currentTimeMillis(); + if(!_authenticated && + (arrivalTime - _creationTime) > _port.getContextValue(Long.class, + Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY)) + { + _logger.warn("Connection has taken more than " + + _port.getContextValue(Long.class, Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY) + + "ms to establish identity. Closing as possible DoS."); + getEventLogger().message(ConnectionMessages.IDLE_CLOSE()); + closeProtocolSession(); + } _lastReceivedTime = arrivalTime; _lastIoTime = arrivalTime; _readBytes += msg.remaining(); @@ -285,7 +301,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi _receivedLock.lock(); try { - final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg); + final ArrayList<AMQDataBlock> dataBlocks = _decoder.decodeBuffer(msg); for (AMQDataBlock dataBlock : dataBlocks) { try @@ -479,7 +495,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi private synchronized void protocolInitiationReceived(ProtocolInitiation pi) { // this ensures the codec never checks for a PI message again - (_codecFactory.getDecoder()).setExpectProtocolInitiation(false); + _decoder.setExpectProtocolInitiation(false); try { // Log incoming protocol negotiation request @@ -1200,6 +1216,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi throw new IllegalArgumentException("authorizedSubject cannot be null"); } + _authenticated = true; _authorizedSubject.getPrincipals().addAll(authorizedSubject.getPrincipals()); _authorizedSubject.getPrivateCredentials().addAll(authorizedSubject.getPrivateCredentials()); _authorizedSubject.getPublicCredentials().addAll(authorizedSubject.getPublicCredentials()); @@ -1273,7 +1290,16 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi public void readerIdle() { - // TODO - enforce disconnect on lack of inbound data + Subject.doAs(_authorizedSubject, new PrivilegedAction<Object>() + { + @Override + public Object run() + { + getEventLogger().message(ConnectionMessages.IDLE_CLOSE()); + _network.close(); + return null; + } + }); } public synchronized void writerIdle() @@ -1344,7 +1370,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi } @Override - public Port getPort() + public Port<?> getPort() { return _port; } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionSecureOkMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionSecureOkMethodHandler.java index a2b596e2b1..92552cb011 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionSecureOkMethodHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionSecureOkMethodHandler.java @@ -33,7 +33,6 @@ import org.apache.qpid.framing.ConnectionSecureOkBody; import org.apache.qpid.framing.ConnectionTuneBody; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.protocol.v0_8.state.AMQState; @@ -59,7 +58,7 @@ public class ConnectionSecureOkMethodHandler implements StateAwareMethodListener public void methodReceived(AMQStateManager stateManager, ConnectionSecureOkBody body, int channelId) throws AMQException { - Broker broker = stateManager.getBroker(); + Broker<?> broker = stateManager.getBroker(); AMQProtocolSession session = stateManager.getProtocolSession(); SubjectCreator subjectCreator = stateManager.getSubjectCreator(); @@ -99,7 +98,7 @@ public class ConnectionSecureOkMethodHandler implements StateAwareMethodListener ConnectionTuneBody tuneBody = methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(), - BrokerProperties.FRAME_SIZE, + broker.getContextValue(Long.class, Broker.BROKER_FRAME_SIZE), broker.getConnection_heartBeatDelay()); session.writeFrame(tuneBody.generateFrame(0)); session.setAuthorizedSubject(authResult.getSubject()); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionStartOkMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionStartOkMethodHandler.java index dc4f010a66..d6801c0fbc 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionStartOkMethodHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionStartOkMethodHandler.java @@ -32,7 +32,6 @@ import org.apache.qpid.framing.ConnectionStartOkBody; import org.apache.qpid.framing.ConnectionTuneBody; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.protocol.v0_8.state.AMQState; @@ -59,7 +58,7 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener< public void methodReceived(AMQStateManager stateManager, ConnectionStartOkBody body, int channelId) throws AMQException { - Broker broker = stateManager.getBroker(); + Broker<?> broker = stateManager.getBroker(); AMQProtocolSession session = stateManager.getProtocolSession(); _logger.info("SASL Mechanism selected: " + body.getMechanism()); @@ -113,7 +112,7 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener< stateManager.changeState(AMQState.CONNECTION_NOT_TUNED); ConnectionTuneBody tuneBody = methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(), - BrokerProperties.FRAME_SIZE, + broker.getContextValue(Long.class,Broker.BROKER_FRAME_SIZE), broker.getConnection_heartBeatDelay()); session.writeFrame(tuneBody.generateFrame(0)); break; diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionTuneOkMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionTuneOkMethodHandler.java index 5fddab6576..108c19dbaf 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionTuneOkMethodHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionTuneOkMethodHandler.java @@ -22,8 +22,11 @@ package org.apache.qpid.server.protocol.v0_8.handler; import org.apache.log4j.Logger; +import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; import org.apache.qpid.framing.ConnectionTuneOkBody; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.protocol.v0_8.state.AMQState; import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; @@ -49,8 +52,29 @@ public class ConnectionTuneOkMethodHandler implements StateAwareMethodListener<C _logger.debug(body); } stateManager.changeState(AMQState.CONNECTION_NOT_OPENED); + session.initHeartbeats(body.getHeartbeat()); - session.setMaxFrameSize(body.getFrameMax()); + + long brokerFrameMax = stateManager.getBroker().getContextValue(Long.class,Broker.BROKER_FRAME_SIZE); + if(brokerFrameMax != 0 && body.getFrameMax() > brokerFrameMax) + { + throw new AMQConnectionException(AMQConstant.SYNTAX_ERROR, + "Attempt to set max frame size to " + body.getFrameMax() + + "greater than the broker will allow: " + + brokerFrameMax, + body.getClazz(), body.getMethod(), + body.getMajor(), body.getMinor(),null); + } + else if(body.getFrameMax() > 0 && body.getFrameMax() < AMQConstant.FRAME_MIN_SIZE.getCode()) + { + throw new AMQConnectionException(AMQConstant.SYNTAX_ERROR, + "Attempt to set max frame size to " + body.getFrameMax() + + "which is smaller than the specification definined minimum: " + + AMQConstant.FRAME_MIN_SIZE.getCode(), + body.getClazz(), body.getMethod(), + body.getMajor(), body.getMinor(),null); + } + session.setMaxFrameSize(body.getFrameMax()== 0l ? (brokerFrameMax == 0l ? 0xFFFFFFFFl : brokerFrameMax) : body.getFrameMax()); long maxChannelNumber = body.getChannelMax(); //0 means no implied limit, except that forced by protocol limitations (0xFFFF) diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/AMQStateManager.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/AMQStateManager.java index cb9295ac49..3c1f1dedc3 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/AMQStateManager.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/AMQStateManager.java @@ -51,12 +51,12 @@ public class AMQStateManager implements AMQMethodListener { private static final Logger _logger = Logger.getLogger(AMQStateManager.class); - private final Broker _broker; + private final Broker<?> _broker; private final AMQProtocolSession _protocolSession; /** The current state */ private AMQState _currentState; - public AMQStateManager(Broker broker, AMQProtocolSession protocolSession) + public AMQStateManager(Broker<?> broker, AMQProtocolSession protocolSession) { _broker = broker; _protocolSession = protocolSession; @@ -69,7 +69,7 @@ public class AMQStateManager implements AMQMethodListener * * @return the Broker */ - public Broker getBroker() + public Broker<?> getBroker() { return _broker; } |