diff options
Diffstat (limited to 'qpid/java/broker-plugins')
12 files changed, 264 insertions, 121 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java index cefd1ee0b2..dc60a37a7f 100755 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java @@ -20,27 +20,32 @@ */ package org.apache.qpid.server.protocol.v0_10; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.security.AccessController; +import java.security.PrivilegedAction; + +import javax.security.auth.Subject; + +import org.apache.log4j.Logger; + import org.apache.qpid.protocol.ServerProtocolEngine; -import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.logging.messages.ConnectionMessages; import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Transport; +import org.apache.qpid.transport.Constant; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.network.Assembler; import org.apache.qpid.transport.network.Disassembler; import org.apache.qpid.transport.network.InputHandler; import org.apache.qpid.transport.network.NetworkConnection; -import javax.security.auth.Subject; -import java.net.SocketAddress; -import java.nio.ByteBuffer; -import java.security.AccessController; -import java.security.PrivilegedAction; - public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocolEngine { public static final int MAX_FRAME_SIZE = 64 * 1024 - 1; + private static final Logger _logger = Logger.getLogger(ProtocolEngine_0_10.class); + private NetworkConnection _network; private long _readBytes; @@ -87,7 +92,9 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol _network = network; _connection.setNetworkConnection(network); - _connection.setSender(new Disassembler(wrapSender(sender), MAX_FRAME_SIZE)); + Disassembler disassembler = new Disassembler(wrapSender(sender), Constant.MIN_MAX_FRAME_SIZE); + _connection.setSender(disassembler); + _connection.addFrameSizeObserver(disassembler); // FIXME Two log messages to maintain compatibility with earlier protocol versions _connection.getEventLogger().message(ConnectionMessages.OPEN(null, "0-10", null, null, false, true, false, false)); @@ -154,6 +161,26 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol public void received(final ByteBuffer buf) { _lastReadTime = System.currentTimeMillis(); + if(_connection.getAuthorizedPrincipal() == null && + (_lastReadTime - _createTime) > _connection.getPort().getContextValue(Long.class, + Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY) ) + { + Subject.doAs(_connection.getAuthorizedSubject(), new PrivilegedAction<Object>() + { + @Override + public Object run() + { + + _logger.warn("Connection has taken more than " + + _connection.getPort() + .getContextValue(Long.class, Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY) + + "ms to establish identity. Closing as possible DoS."); + _connection.getEventLogger().message(ConnectionMessages.IDLE_CLOSE()); + _network.close(); + return null; + } + }); + } super.received(buf); _connection.receivedComplete(); } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java index 2ad79ad980..8ddd04f51a 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java @@ -75,7 +75,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S private final long _connectionId; private final Object _reference = new Object(); private VirtualHostImpl _virtualHost; - private Port _port; + private Port<?> _port; private AtomicLong _lastIoTime = new AtomicLong(); private boolean _blocking; private Transport _transport; @@ -189,12 +189,12 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S } @Override - public Port getPort() + public Port<?> getPort() { return _port; } - public void setPort(Port port) + public void setPort(Port<?> port) { _port = port; } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java index 7751ff765d..bab2d802e8 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java @@ -50,21 +50,7 @@ import org.apache.qpid.server.security.SubjectCreator; import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus; import org.apache.qpid.server.security.auth.SubjectAuthenticationResult; import org.apache.qpid.server.virtualhost.VirtualHostImpl; -import org.apache.qpid.transport.Binary; -import org.apache.qpid.transport.Connection; -import org.apache.qpid.transport.ConnectionClose; -import org.apache.qpid.transport.ConnectionCloseCode; -import org.apache.qpid.transport.ConnectionOpen; -import org.apache.qpid.transport.ConnectionOpenOk; -import org.apache.qpid.transport.ConnectionStartOk; -import org.apache.qpid.transport.ConnectionTuneOk; -import org.apache.qpid.transport.ServerDelegate; -import org.apache.qpid.transport.Session; -import org.apache.qpid.transport.SessionAttach; -import org.apache.qpid.transport.SessionDelegate; -import org.apache.qpid.transport.SessionDetach; -import org.apache.qpid.transport.SessionDetachCode; -import org.apache.qpid.transport.SessionDetached; +import org.apache.qpid.transport.*; import org.apache.qpid.transport.network.NetworkConnection; public class ServerConnectionDelegate extends ServerDelegate @@ -76,15 +62,16 @@ public class ServerConnectionDelegate extends ServerDelegate private int _maxNoOfChannels; private Map<String,Object> _clientProperties; private final SubjectCreator _subjectCreator; + private int _maximumFrameSize; - public ServerConnectionDelegate(Broker broker, String localFQDN, SubjectCreator subjectCreator) + public ServerConnectionDelegate(Broker<?> broker, String localFQDN, SubjectCreator subjectCreator) { this(createConnectionProperties(broker), Collections.singletonList((Object)"en_US"), broker, localFQDN, subjectCreator); } private ServerConnectionDelegate(Map<String, Object> properties, List<Object> locales, - Broker broker, + Broker<?> broker, String localFQDN, SubjectCreator subjectCreator) { @@ -94,9 +81,10 @@ public class ServerConnectionDelegate extends ServerDelegate _localFQDN = localFQDN; _maxNoOfChannels = broker.getConnection_sessionCountLimit(); _subjectCreator = subjectCreator; + _maximumFrameSize = (int) Math.min(0xffffl, broker.getContextValue(Long.class, Broker.BROKER_FRAME_SIZE)); } - private static List<String> getFeatures(Broker broker) + private static List<String> getFeatures(Broker<?> broker) { String brokerDisabledFeatures = System.getProperty(BrokerProperties.PROPERTY_DISABLED_FEATURES); final List<String> features = new ArrayList<String>(); @@ -108,7 +96,7 @@ public class ServerConnectionDelegate extends ServerDelegate return Collections.unmodifiableList(features); } - private static Map<String, Object> createConnectionProperties(final Broker broker) + private static Map<String, Object> createConnectionProperties(final Broker<?> broker) { final Map<String,Object> map = new HashMap<String,Object>(); // Federation tag is used by the client to identify the broker instance @@ -234,6 +222,7 @@ public class ServerConnectionDelegate extends ServerDelegate { ServerConnection sconn = (ServerConnection) conn; int okChannelMax = ok.getChannelMax(); + int okMaxFrameSize = ok.getMaxFrameSize(); if (okChannelMax > getChannelMax()) { @@ -246,6 +235,31 @@ public class ServerConnectionDelegate extends ServerDelegate return; } + if(okMaxFrameSize > getFrameMax()) + { + LOGGER.error("Connection '" + sconn.getConnectionId() + "' being severed, " + + "client connectionTuneOk returned a frameMax (" + okMaxFrameSize + + ") above the server's offered limit (" + getFrameMax() +")"); + + //Due to the error we must forcefully close the connection without negotiation + sconn.getSender().close(); + return; + } + else if(okMaxFrameSize > 0 && okMaxFrameSize < Constant.MIN_MAX_FRAME_SIZE) + { + LOGGER.error("Connection '" + sconn.getConnectionId() + "' being severed, " + + "client connectionTuneOk returned a frameMax (" + okMaxFrameSize + + ") below the minimum permitted size (" + Constant.MIN_MAX_FRAME_SIZE +")"); + + //Due to the error we must forcefully close the connection without negotiation + sconn.getSender().close(); + return; + } + else if(okMaxFrameSize == 0) + { + okMaxFrameSize = getFrameMax(); + } + final NetworkConnection networkConnection = sconn.getNetworkConnection(); if(ok.hasHeartbeat()) { @@ -266,6 +280,8 @@ public class ServerConnectionDelegate extends ServerDelegate } setConnectionTuneOkChannelMax(sconn, okChannelMax); + + conn.setMaxFrameSize(okMaxFrameSize); } @Override @@ -279,6 +295,12 @@ public class ServerConnectionDelegate extends ServerDelegate _maxNoOfChannels = channelMax; } + @Override + protected int getFrameMax() + { + return _maximumFrameSize; + } + @Override public void sessionDetach(Connection conn, SessionDetach dtc) { // To ensure a clean detach, we stop any remaining subscriptions. Stop ensures diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java index 46a9430814..1c264e52c6 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java @@ -44,37 +44,38 @@ import javax.security.auth.Subject; import javax.security.sasl.SaslServer; import org.apache.log4j.Logger; + import org.apache.qpid.AMQChannelException; import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.*; -import org.apache.qpid.codec.AMQCodecFactory; +import org.apache.qpid.codec.AMQDecoder; import org.apache.qpid.common.QpidProperties; import org.apache.qpid.common.ServerPropertyNames; +import org.apache.qpid.framing.*; import org.apache.qpid.properties.ConnectionStartProperties; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.connection.ConnectionPrincipal; import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.logging.EventLogger; -import org.apache.qpid.server.message.InstanceProperties; -import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.configuration.BrokerProperties; -import org.apache.qpid.server.protocol.AMQSessionModel; -import org.apache.qpid.server.protocol.SessionModelListener; -import org.apache.qpid.server.protocol.v0_8.handler.ServerMethodDispatcherImpl; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.messages.ConnectionMessages; import org.apache.qpid.server.logging.subjects.ConnectionLogSubject; +import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Transport; +import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.protocol.SessionModelListener; +import org.apache.qpid.server.protocol.v0_8.handler.ServerMethodDispatcherImpl; import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter; import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverterRegistry; -import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; import org.apache.qpid.server.protocol.v0_8.state.AMQState; import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; +import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; @@ -93,7 +94,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi // channels. This value must be of the form 2^x - 1. private static final int CHANNEL_CACHE_SIZE = 0xff; private static final int REUSABLE_BYTE_BUFFER_CAPACITY = 65 * 1024; - private final Port _port; + private final Port<?> _port; + private final long _creationTime; private AMQShortString _contextKey; @@ -123,7 +125,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi private final AMQStateManager _stateManager; - private AMQCodecFactory _codecFactory; + private AMQDecoder _decoder; private SaslServer _saslServer; @@ -166,12 +168,13 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi private final ReentrantLock _receivedLock; private AtomicLong _lastWriteTime = new AtomicLong(System.currentTimeMillis()); - private final Broker _broker; + private final Broker<?> _broker; private final Transport _transport; private volatile boolean _closeWhenNoRoute; private volatile boolean _stopped; private long _readBytes; + private boolean _authenticated; public AMQProtocolEngine(Broker broker, final NetworkConnection network, @@ -185,7 +188,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi _maxNoOfChannels = broker.getConnection_sessionCountLimit(); _receivedLock = new ReentrantLock(); _stateManager = new AMQStateManager(broker, this); - _codecFactory = new AMQCodecFactory(true, this); + _decoder = new AMQDecoder(true, this); _connectionID = connectionId; _logSubject = new ConnectionLogSubject(this); @@ -210,6 +213,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi _dataDelivered = new StatisticsCounter("data-delivered-" + getSessionID()); _messagesReceived = new StatisticsCounter("messages-received-" + getSessionID()); _dataReceived = new StatisticsCounter("data-received-" + getSessionID()); + _creationTime = System.currentTimeMillis(); } private <T> T runAsSubject(PrivilegedAction<T> action) @@ -247,6 +251,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi public void setMaxFrameSize(long frameMax) { _maxFrameSize = frameMax; + _decoder.setMaxFrameSize(frameMax); } public long getMaxFrameSize() @@ -277,7 +282,18 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi @Override public Void run() { + final long arrivalTime = System.currentTimeMillis(); + if(!_authenticated && + (arrivalTime - _creationTime) > _port.getContextValue(Long.class, + Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY)) + { + _logger.warn("Connection has taken more than " + + _port.getContextValue(Long.class, Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY) + + "ms to establish identity. Closing as possible DoS."); + getEventLogger().message(ConnectionMessages.IDLE_CLOSE()); + closeProtocolSession(); + } _lastReceivedTime = arrivalTime; _lastIoTime = arrivalTime; _readBytes += msg.remaining(); @@ -285,7 +301,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi _receivedLock.lock(); try { - final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg); + final ArrayList<AMQDataBlock> dataBlocks = _decoder.decodeBuffer(msg); for (AMQDataBlock dataBlock : dataBlocks) { try @@ -479,7 +495,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi private synchronized void protocolInitiationReceived(ProtocolInitiation pi) { // this ensures the codec never checks for a PI message again - (_codecFactory.getDecoder()).setExpectProtocolInitiation(false); + _decoder.setExpectProtocolInitiation(false); try { // Log incoming protocol negotiation request @@ -1200,6 +1216,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi throw new IllegalArgumentException("authorizedSubject cannot be null"); } + _authenticated = true; _authorizedSubject.getPrincipals().addAll(authorizedSubject.getPrincipals()); _authorizedSubject.getPrivateCredentials().addAll(authorizedSubject.getPrivateCredentials()); _authorizedSubject.getPublicCredentials().addAll(authorizedSubject.getPublicCredentials()); @@ -1273,7 +1290,16 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi public void readerIdle() { - // TODO - enforce disconnect on lack of inbound data + Subject.doAs(_authorizedSubject, new PrivilegedAction<Object>() + { + @Override + public Object run() + { + getEventLogger().message(ConnectionMessages.IDLE_CLOSE()); + _network.close(); + return null; + } + }); } public synchronized void writerIdle() @@ -1344,7 +1370,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi } @Override - public Port getPort() + public Port<?> getPort() { return _port; } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionSecureOkMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionSecureOkMethodHandler.java index a2b596e2b1..92552cb011 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionSecureOkMethodHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionSecureOkMethodHandler.java @@ -33,7 +33,6 @@ import org.apache.qpid.framing.ConnectionSecureOkBody; import org.apache.qpid.framing.ConnectionTuneBody; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.protocol.v0_8.state.AMQState; @@ -59,7 +58,7 @@ public class ConnectionSecureOkMethodHandler implements StateAwareMethodListener public void methodReceived(AMQStateManager stateManager, ConnectionSecureOkBody body, int channelId) throws AMQException { - Broker broker = stateManager.getBroker(); + Broker<?> broker = stateManager.getBroker(); AMQProtocolSession session = stateManager.getProtocolSession(); SubjectCreator subjectCreator = stateManager.getSubjectCreator(); @@ -99,7 +98,7 @@ public class ConnectionSecureOkMethodHandler implements StateAwareMethodListener ConnectionTuneBody tuneBody = methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(), - BrokerProperties.FRAME_SIZE, + broker.getContextValue(Long.class, Broker.BROKER_FRAME_SIZE), broker.getConnection_heartBeatDelay()); session.writeFrame(tuneBody.generateFrame(0)); session.setAuthorizedSubject(authResult.getSubject()); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionStartOkMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionStartOkMethodHandler.java index dc4f010a66..d6801c0fbc 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionStartOkMethodHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionStartOkMethodHandler.java @@ -32,7 +32,6 @@ import org.apache.qpid.framing.ConnectionStartOkBody; import org.apache.qpid.framing.ConnectionTuneBody; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.protocol.v0_8.state.AMQState; @@ -59,7 +58,7 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener< public void methodReceived(AMQStateManager stateManager, ConnectionStartOkBody body, int channelId) throws AMQException { - Broker broker = stateManager.getBroker(); + Broker<?> broker = stateManager.getBroker(); AMQProtocolSession session = stateManager.getProtocolSession(); _logger.info("SASL Mechanism selected: " + body.getMechanism()); @@ -113,7 +112,7 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener< stateManager.changeState(AMQState.CONNECTION_NOT_TUNED); ConnectionTuneBody tuneBody = methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(), - BrokerProperties.FRAME_SIZE, + broker.getContextValue(Long.class,Broker.BROKER_FRAME_SIZE), broker.getConnection_heartBeatDelay()); session.writeFrame(tuneBody.generateFrame(0)); break; diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionTuneOkMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionTuneOkMethodHandler.java index 5fddab6576..108c19dbaf 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionTuneOkMethodHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionTuneOkMethodHandler.java @@ -22,8 +22,11 @@ package org.apache.qpid.server.protocol.v0_8.handler; import org.apache.log4j.Logger; +import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; import org.apache.qpid.framing.ConnectionTuneOkBody; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.protocol.v0_8.state.AMQState; import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; @@ -49,8 +52,29 @@ public class ConnectionTuneOkMethodHandler implements StateAwareMethodListener<C _logger.debug(body); } stateManager.changeState(AMQState.CONNECTION_NOT_OPENED); + session.initHeartbeats(body.getHeartbeat()); - session.setMaxFrameSize(body.getFrameMax()); + + long brokerFrameMax = stateManager.getBroker().getContextValue(Long.class,Broker.BROKER_FRAME_SIZE); + if(brokerFrameMax != 0 && body.getFrameMax() > brokerFrameMax) + { + throw new AMQConnectionException(AMQConstant.SYNTAX_ERROR, + "Attempt to set max frame size to " + body.getFrameMax() + + "greater than the broker will allow: " + + brokerFrameMax, + body.getClazz(), body.getMethod(), + body.getMajor(), body.getMinor(),null); + } + else if(body.getFrameMax() > 0 && body.getFrameMax() < AMQConstant.FRAME_MIN_SIZE.getCode()) + { + throw new AMQConnectionException(AMQConstant.SYNTAX_ERROR, + "Attempt to set max frame size to " + body.getFrameMax() + + "which is smaller than the specification definined minimum: " + + AMQConstant.FRAME_MIN_SIZE.getCode(), + body.getClazz(), body.getMethod(), + body.getMajor(), body.getMinor(),null); + } + session.setMaxFrameSize(body.getFrameMax()== 0l ? (brokerFrameMax == 0l ? 0xFFFFFFFFl : brokerFrameMax) : body.getFrameMax()); long maxChannelNumber = body.getChannelMax(); //0 means no implied limit, except that forced by protocol limitations (0xFFFF) diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/AMQStateManager.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/AMQStateManager.java index cb9295ac49..3c1f1dedc3 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/AMQStateManager.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/AMQStateManager.java @@ -51,12 +51,12 @@ public class AMQStateManager implements AMQMethodListener { private static final Logger _logger = Logger.getLogger(AMQStateManager.class); - private final Broker _broker; + private final Broker<?> _broker; private final AMQProtocolSession _protocolSession; /** The current state */ private AMQState _currentState; - public AMQStateManager(Broker broker, AMQProtocolSession protocolSession) + public AMQStateManager(Broker<?> broker, AMQProtocolSession protocolSession) { _broker = broker; _protocolSession = protocolSession; @@ -69,7 +69,7 @@ public class AMQStateManager implements AMQMethodListener * * @return the Broker */ - public Broker getBroker() + public Broker<?> getBroker() { return _broker; } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java index ffa65b2477..2a48ccb2df 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java @@ -62,7 +62,7 @@ import org.apache.qpid.server.virtualhost.VirtualHostImpl; public class Connection_1_0 implements ConnectionEventListener, AMQConnectionModel<Connection_1_0,Session_1_0> { - private final Port _port; + private final Port<?> _port; private final Broker _broker; private final SubjectCreator _subjectCreator; private VirtualHostImpl _vhost; @@ -358,7 +358,7 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod } @Override - public Port getPort() + public Port<?> getPort() { return _port; } diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Broker.js b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Broker.js index 11a79984b3..8cc3e76b58 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Broker.js +++ b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Broker.js @@ -356,20 +356,20 @@ define(["dojo/_base/xhr", that.vhostsGrid = new UpdatableStore(that.brokerData.virtualhostnodes, query(".broker-virtualhosts")[0], [ - { name: "Node Name", field: "name", width: "15%"}, + { name: "Node Name", field: "name", width: "10%"}, { name: "Node State", field: "state", width: "10%"}, { name: "Node Type", field: "type", width: "10%"}, - { name: "Host Name", field: "_item", width: "15%", + { name: "Host Name", field: "_item", width: "10%", formatter: function(item){ return item && item.virtualhosts? item.virtualhosts[0].name: "N/A"; } }, - { name: "Host State", field: "_item", width: "10%", + { name: "Host State", field: "_item", width: "15%", formatter: function(item){ return item && item.virtualhosts? item.virtualhosts[0].state: "N/A"; } }, - { name: "Host Type", field: "_item", width: "10%", + { name: "Host Type", field: "_item", width: "15%", formatter: function(item){ return item && item.virtualhosts? item.virtualhosts[0].type: "N/A"; } diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js index 025390b9ff..59e49f3302 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js +++ b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js @@ -333,7 +333,11 @@ define(["dojo/_base/xhr", "bytesInRateUnits", "msgOutRate", "bytesOutRate", - "bytesOutRateUnits"]); + "bytesOutRateUnits", + "queueFlowResumeSizeBytes", + "queueFlowControlSizeBytes", + "maximumDeliveryAttempts", + "oldestMessageAge"]); @@ -413,6 +417,13 @@ define(["dojo/_base/xhr", { this.messageGroups.style.display = "none"; } + + this.queueFlowControlSizeBytes.innerHTML = entities.encode(String(this.queueData[ "queueFlowControlSizeBytes" ])); + this.queueFlowResumeSizeBytes.innerHTML = entities.encode(String(this.queueData[ "queueFlowResumeSizeBytes" ])); + + this.oldestMessageAge.innerHTML = entities.encode(String(this.queueData[ "oldestMessageAge" ] / 1000)); + var maximumDeliveryAttempts = this.queueData[ "maximumDeliveryAttempts" ]; + this.maximumDeliveryAttempts.innerHTML = entities.encode(String( maximumDeliveryAttempts == 0 ? "" : maximumDeliveryAttempts)); }; QueueUpdater.prototype.update = function() diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html b/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html index 52903a80ea..961f60e214 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html +++ b/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html @@ -20,22 +20,81 @@ --> <div class="queue"> <div data-dojo-type="dijit.TitlePane" data-dojo-props="title: 'Queue Attributes', open: true"> + <div class="clear"> <div class="formLabel-labelCell">Name:</div> - <div class="name"></div> - </div> - <div class="clear"> - <div class="formLabel-labelCell">State:</div> - <div class="state"></div> + <div class="name formValue-valueCell"></div> </div> <div class="clear"> - <div class="formLabel-labelCell">Durable:</div> - <div class="durable"></div> + <div class="alignLeft"> + <div class="clear"> + <div class="formLabel-labelCell">Type:</div> + <div class="type formValue-valueCell"></div> + <div class="typeQualifier formValue-valueCell"></div> + </div> + <div class="clear"> + <div class="formLabel-labelCell">State:</div> + <div class="state formValue-valueCell"></div> + </div> + <div class="clear"> + <div class="formLabel-labelCell">Durable:</div> + <div class="durable formValue-valueCell"></div> + </div> + <div class="clear"> + <div class="formLabel-labelCell">Lifespan:</div> + <div class="lifetimePolicy formValue-valueCell"></div> + </div> + <div class="clear"> + <div class="formLabel-labelCell">Persist Messages:</div> + <div class="messageDurability formValue-valueCell"></div> + </div> </div> - <div class="clear"> - <div class="formLabel-labelCell">Persist Messages:</div> - <div class="messageDurability"></div> + <div class="alignRight"> + <div> + <div class="formLabel-labelCell">Inbound:</div> + <div class="formValue-valueCell"> + <span class="msgInRate"></span> + <span> msg/s</span> + <span class="bytesInRate"></span> + <span class="bytesInRateUnits"></span> + </div> + </div> + <div> + <div class="formLabel-labelCell">Outbound:</div> + <div class="formValue-valueCell"> + <span class="msgOutRate"></span> + <span> msg/s</span> + <span class="bytesOutRate"></span> + <span class="bytesOutRateUnits"></span> + </div> + </div> + <div> + <div class="formLabel-labelCell">Size:</div> + <div class="formValue-valueCell"> + <span class="queueDepthMessages"></span> + <span> msgs</span> + <span class="queueDepthBytes">(</span> + <span class="queueDepthBytesUnits">)</span> + </div> + </div> + <div> + <div class="formLabel-labelCell">Pre-fetched:</div> + <div class="formValue-valueCell"> + <span class="unacknowledgedMessages"></span> + <span> msgs</span> + <span class="unacknowledgedBytes">(</span> + <span class="unacknowledgedBytesUnits">)</span> + </div> + </div> + <div> + <div class="formLabel-labelCell">Oldest Message Age:</div> + <div class="formValue-valueCell"> + <span class="oldestMessageAge"></span> + <span> secs</span> + </div> + </div> </div> + <div class="clear"></div> <div class="clear"> <div class="formLabel-labelCell">Enforced Max. Ttl(ms):</div> <div class="maximumMessageTtl"></div> @@ -53,55 +112,12 @@ <div class="owner"></div> </div> <div class="clear"> - <div class="formLabel-labelCell">Lifespan:</div> - <div class="lifetimePolicy"></div> - </div> - <div class="clear"> - <div class="formLabel-labelCell">Type:</div> - <div> - <span class="type"></span> - <span class="typeQualifier"></span> - </div> - </div> - <div class="clear"> - <div class="formLabel-labelCell">Size:</div> - <div> - <span class="queueDepthMessages"></span> - <span> msgs</span> - <span class="queueDepthBytes">(</span> - <span class="queueDepthBytesUnits">)</span> - </div> - </div> - <div class="clear"> - <div class="formLabel-labelCell">Pre-fetched:</div> - <div> - <span class="unacknowledgedMessages"></span> - <span> msgs</span> - <span class="unacknowledgedBytes">(</span> - <span class="unacknowledgedBytesUnits">)</span> - </div> - </div> - <div class="clear"> - <div class="formLabel-labelCell">Inbound:</div> - <div> - <span class="msgInRate"></span> - <span> msg/s</span> - <span class="bytesInRate">(</span> - <span class="bytesInRateUnits">)</span> - </div> + <div class="formLabel-labelCell">Alternate Exchange:</div> + <div class="alternateExchange"></div> </div> <div class="clear"> - <div class="formLabel-labelCell">Outbound:</div> - <div> - <span class="msgOutRate"></span> - <span> msg/s</span> - <span class="bytesOutRate">(</span> - <span class="bytesOutRateUnits">)</span> - </div> - </div> - <div class="clear"> - <div class="formLabel-labelCell">AlternateExchange:</div> - <div class="alternateExchange"></div> + <div class="formLabel-labelCell">Maximum Delivery Attempts:</div> + <div class="maximumDeliveryAttempts"></div> </div> <div class="clear messageGroups"> <div class="clear"> @@ -133,6 +149,25 @@ <button data-dojo-type="dijit.form.Button" class="copyMessagesButton" type="button">Copy Messages</button> </div> <br/> + <div data-dojo-type="dijit.TitlePane" data-dojo-props="title: 'Flow Control Settings', open: false"> + <div class="clear"> + <div class="formLabel-labelCell">Capacity:</div> + <div> + <span class="queueFlowControlSizeBytes"></span> + <span>B</span> + </div> + </div> + <div class="clear"> + <div class="formLabel-labelCell">Resume Capacity:</div> + <div> + <span class="queueFlowResumeSizeBytes"></span> + <span>B</span> + </div> + </div> + <div class="clear"></div> + </div> + + <br/> <div data-dojo-type="dijit.TitlePane" data-dojo-props="title: 'Alerting Thresholds', open: false"> <div class="clear"> <div class="formLabel-labelCell">Queue Depth:</div> |