summaryrefslogtreecommitdiff
path: root/qpid/java/broker-plugins
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker-plugins')
-rwxr-xr-xqpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java43
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java6
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java60
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java60
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionSecureOkMethodHandler.java5
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionStartOkMethodHandler.java5
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionTuneOkMethodHandler.java26
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/AMQStateManager.java6
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java4
-rw-r--r--qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Broker.js8
-rw-r--r--qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js13
-rw-r--r--qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html149
12 files changed, 264 insertions, 121 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
index cefd1ee0b2..dc60a37a7f 100755
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
@@ -20,27 +20,32 @@
*/
package org.apache.qpid.server.protocol.v0_10;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+
+import javax.security.auth.Subject;
+
+import org.apache.log4j.Logger;
+
import org.apache.qpid.protocol.ServerProtocolEngine;
-import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Transport;
+import org.apache.qpid.transport.Constant;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.network.Assembler;
import org.apache.qpid.transport.network.Disassembler;
import org.apache.qpid.transport.network.InputHandler;
import org.apache.qpid.transport.network.NetworkConnection;
-import javax.security.auth.Subject;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.security.AccessController;
-import java.security.PrivilegedAction;
-
public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocolEngine
{
public static final int MAX_FRAME_SIZE = 64 * 1024 - 1;
+ private static final Logger _logger = Logger.getLogger(ProtocolEngine_0_10.class);
+
private NetworkConnection _network;
private long _readBytes;
@@ -87,7 +92,9 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
_network = network;
_connection.setNetworkConnection(network);
- _connection.setSender(new Disassembler(wrapSender(sender), MAX_FRAME_SIZE));
+ Disassembler disassembler = new Disassembler(wrapSender(sender), Constant.MIN_MAX_FRAME_SIZE);
+ _connection.setSender(disassembler);
+ _connection.addFrameSizeObserver(disassembler);
// FIXME Two log messages to maintain compatibility with earlier protocol versions
_connection.getEventLogger().message(ConnectionMessages.OPEN(null, "0-10", null, null, false, true, false, false));
@@ -154,6 +161,26 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
public void received(final ByteBuffer buf)
{
_lastReadTime = System.currentTimeMillis();
+ if(_connection.getAuthorizedPrincipal() == null &&
+ (_lastReadTime - _createTime) > _connection.getPort().getContextValue(Long.class,
+ Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY) )
+ {
+ Subject.doAs(_connection.getAuthorizedSubject(), new PrivilegedAction<Object>()
+ {
+ @Override
+ public Object run()
+ {
+
+ _logger.warn("Connection has taken more than "
+ + _connection.getPort()
+ .getContextValue(Long.class, Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY)
+ + "ms to establish identity. Closing as possible DoS.");
+ _connection.getEventLogger().message(ConnectionMessages.IDLE_CLOSE());
+ _network.close();
+ return null;
+ }
+ });
+ }
super.received(buf);
_connection.receivedComplete();
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
index 2ad79ad980..8ddd04f51a 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
@@ -75,7 +75,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
private final long _connectionId;
private final Object _reference = new Object();
private VirtualHostImpl _virtualHost;
- private Port _port;
+ private Port<?> _port;
private AtomicLong _lastIoTime = new AtomicLong();
private boolean _blocking;
private Transport _transport;
@@ -189,12 +189,12 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
}
@Override
- public Port getPort()
+ public Port<?> getPort()
{
return _port;
}
- public void setPort(Port port)
+ public void setPort(Port<?> port)
{
_port = port;
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
index 7751ff765d..bab2d802e8 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
@@ -50,21 +50,7 @@ import org.apache.qpid.server.security.SubjectCreator;
import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus;
import org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
-import org.apache.qpid.transport.Binary;
-import org.apache.qpid.transport.Connection;
-import org.apache.qpid.transport.ConnectionClose;
-import org.apache.qpid.transport.ConnectionCloseCode;
-import org.apache.qpid.transport.ConnectionOpen;
-import org.apache.qpid.transport.ConnectionOpenOk;
-import org.apache.qpid.transport.ConnectionStartOk;
-import org.apache.qpid.transport.ConnectionTuneOk;
-import org.apache.qpid.transport.ServerDelegate;
-import org.apache.qpid.transport.Session;
-import org.apache.qpid.transport.SessionAttach;
-import org.apache.qpid.transport.SessionDelegate;
-import org.apache.qpid.transport.SessionDetach;
-import org.apache.qpid.transport.SessionDetachCode;
-import org.apache.qpid.transport.SessionDetached;
+import org.apache.qpid.transport.*;
import org.apache.qpid.transport.network.NetworkConnection;
public class ServerConnectionDelegate extends ServerDelegate
@@ -76,15 +62,16 @@ public class ServerConnectionDelegate extends ServerDelegate
private int _maxNoOfChannels;
private Map<String,Object> _clientProperties;
private final SubjectCreator _subjectCreator;
+ private int _maximumFrameSize;
- public ServerConnectionDelegate(Broker broker, String localFQDN, SubjectCreator subjectCreator)
+ public ServerConnectionDelegate(Broker<?> broker, String localFQDN, SubjectCreator subjectCreator)
{
this(createConnectionProperties(broker), Collections.singletonList((Object)"en_US"), broker, localFQDN, subjectCreator);
}
private ServerConnectionDelegate(Map<String, Object> properties,
List<Object> locales,
- Broker broker,
+ Broker<?> broker,
String localFQDN,
SubjectCreator subjectCreator)
{
@@ -94,9 +81,10 @@ public class ServerConnectionDelegate extends ServerDelegate
_localFQDN = localFQDN;
_maxNoOfChannels = broker.getConnection_sessionCountLimit();
_subjectCreator = subjectCreator;
+ _maximumFrameSize = (int) Math.min(0xffffl, broker.getContextValue(Long.class, Broker.BROKER_FRAME_SIZE));
}
- private static List<String> getFeatures(Broker broker)
+ private static List<String> getFeatures(Broker<?> broker)
{
String brokerDisabledFeatures = System.getProperty(BrokerProperties.PROPERTY_DISABLED_FEATURES);
final List<String> features = new ArrayList<String>();
@@ -108,7 +96,7 @@ public class ServerConnectionDelegate extends ServerDelegate
return Collections.unmodifiableList(features);
}
- private static Map<String, Object> createConnectionProperties(final Broker broker)
+ private static Map<String, Object> createConnectionProperties(final Broker<?> broker)
{
final Map<String,Object> map = new HashMap<String,Object>();
// Federation tag is used by the client to identify the broker instance
@@ -234,6 +222,7 @@ public class ServerConnectionDelegate extends ServerDelegate
{
ServerConnection sconn = (ServerConnection) conn;
int okChannelMax = ok.getChannelMax();
+ int okMaxFrameSize = ok.getMaxFrameSize();
if (okChannelMax > getChannelMax())
{
@@ -246,6 +235,31 @@ public class ServerConnectionDelegate extends ServerDelegate
return;
}
+ if(okMaxFrameSize > getFrameMax())
+ {
+ LOGGER.error("Connection '" + sconn.getConnectionId() + "' being severed, " +
+ "client connectionTuneOk returned a frameMax (" + okMaxFrameSize +
+ ") above the server's offered limit (" + getFrameMax() +")");
+
+ //Due to the error we must forcefully close the connection without negotiation
+ sconn.getSender().close();
+ return;
+ }
+ else if(okMaxFrameSize > 0 && okMaxFrameSize < Constant.MIN_MAX_FRAME_SIZE)
+ {
+ LOGGER.error("Connection '" + sconn.getConnectionId() + "' being severed, " +
+ "client connectionTuneOk returned a frameMax (" + okMaxFrameSize +
+ ") below the minimum permitted size (" + Constant.MIN_MAX_FRAME_SIZE +")");
+
+ //Due to the error we must forcefully close the connection without negotiation
+ sconn.getSender().close();
+ return;
+ }
+ else if(okMaxFrameSize == 0)
+ {
+ okMaxFrameSize = getFrameMax();
+ }
+
final NetworkConnection networkConnection = sconn.getNetworkConnection();
if(ok.hasHeartbeat())
{
@@ -266,6 +280,8 @@ public class ServerConnectionDelegate extends ServerDelegate
}
setConnectionTuneOkChannelMax(sconn, okChannelMax);
+
+ conn.setMaxFrameSize(okMaxFrameSize);
}
@Override
@@ -279,6 +295,12 @@ public class ServerConnectionDelegate extends ServerDelegate
_maxNoOfChannels = channelMax;
}
+ @Override
+ protected int getFrameMax()
+ {
+ return _maximumFrameSize;
+ }
+
@Override public void sessionDetach(Connection conn, SessionDetach dtc)
{
// To ensure a clean detach, we stop any remaining subscriptions. Stop ensures
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
index 46a9430814..1c264e52c6 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
@@ -44,37 +44,38 @@ import javax.security.auth.Subject;
import javax.security.sasl.SaslServer;
import org.apache.log4j.Logger;
+
import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.*;
-import org.apache.qpid.codec.AMQCodecFactory;
+import org.apache.qpid.codec.AMQDecoder;
import org.apache.qpid.common.QpidProperties;
import org.apache.qpid.common.ServerPropertyNames;
+import org.apache.qpid.framing.*;
import org.apache.qpid.properties.ConnectionStartProperties;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.connection.ConnectionPrincipal;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.logging.EventLogger;
-import org.apache.qpid.server.message.InstanceProperties;
-import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.configuration.BrokerProperties;
-import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.protocol.SessionModelListener;
-import org.apache.qpid.server.protocol.v0_8.handler.ServerMethodDispatcherImpl;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.logging.subjects.ConnectionLogSubject;
+import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Transport;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.SessionModelListener;
+import org.apache.qpid.server.protocol.v0_8.handler.ServerMethodDispatcherImpl;
import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverterRegistry;
-import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.protocol.v0_8.state.AMQState;
import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
+import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
@@ -93,7 +94,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
// channels. This value must be of the form 2^x - 1.
private static final int CHANNEL_CACHE_SIZE = 0xff;
private static final int REUSABLE_BYTE_BUFFER_CAPACITY = 65 * 1024;
- private final Port _port;
+ private final Port<?> _port;
+ private final long _creationTime;
private AMQShortString _contextKey;
@@ -123,7 +125,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
private final AMQStateManager _stateManager;
- private AMQCodecFactory _codecFactory;
+ private AMQDecoder _decoder;
private SaslServer _saslServer;
@@ -166,12 +168,13 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
private final ReentrantLock _receivedLock;
private AtomicLong _lastWriteTime = new AtomicLong(System.currentTimeMillis());
- private final Broker _broker;
+ private final Broker<?> _broker;
private final Transport _transport;
private volatile boolean _closeWhenNoRoute;
private volatile boolean _stopped;
private long _readBytes;
+ private boolean _authenticated;
public AMQProtocolEngine(Broker broker,
final NetworkConnection network,
@@ -185,7 +188,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
_maxNoOfChannels = broker.getConnection_sessionCountLimit();
_receivedLock = new ReentrantLock();
_stateManager = new AMQStateManager(broker, this);
- _codecFactory = new AMQCodecFactory(true, this);
+ _decoder = new AMQDecoder(true, this);
_connectionID = connectionId;
_logSubject = new ConnectionLogSubject(this);
@@ -210,6 +213,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
_dataDelivered = new StatisticsCounter("data-delivered-" + getSessionID());
_messagesReceived = new StatisticsCounter("messages-received-" + getSessionID());
_dataReceived = new StatisticsCounter("data-received-" + getSessionID());
+ _creationTime = System.currentTimeMillis();
}
private <T> T runAsSubject(PrivilegedAction<T> action)
@@ -247,6 +251,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
public void setMaxFrameSize(long frameMax)
{
_maxFrameSize = frameMax;
+ _decoder.setMaxFrameSize(frameMax);
}
public long getMaxFrameSize()
@@ -277,7 +282,18 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
@Override
public Void run()
{
+
final long arrivalTime = System.currentTimeMillis();
+ if(!_authenticated &&
+ (arrivalTime - _creationTime) > _port.getContextValue(Long.class,
+ Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY))
+ {
+ _logger.warn("Connection has taken more than "
+ + _port.getContextValue(Long.class, Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY)
+ + "ms to establish identity. Closing as possible DoS.");
+ getEventLogger().message(ConnectionMessages.IDLE_CLOSE());
+ closeProtocolSession();
+ }
_lastReceivedTime = arrivalTime;
_lastIoTime = arrivalTime;
_readBytes += msg.remaining();
@@ -285,7 +301,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
_receivedLock.lock();
try
{
- final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
+ final ArrayList<AMQDataBlock> dataBlocks = _decoder.decodeBuffer(msg);
for (AMQDataBlock dataBlock : dataBlocks)
{
try
@@ -479,7 +495,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
private synchronized void protocolInitiationReceived(ProtocolInitiation pi)
{
// this ensures the codec never checks for a PI message again
- (_codecFactory.getDecoder()).setExpectProtocolInitiation(false);
+ _decoder.setExpectProtocolInitiation(false);
try
{
// Log incoming protocol negotiation request
@@ -1200,6 +1216,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
throw new IllegalArgumentException("authorizedSubject cannot be null");
}
+ _authenticated = true;
_authorizedSubject.getPrincipals().addAll(authorizedSubject.getPrincipals());
_authorizedSubject.getPrivateCredentials().addAll(authorizedSubject.getPrivateCredentials());
_authorizedSubject.getPublicCredentials().addAll(authorizedSubject.getPublicCredentials());
@@ -1273,7 +1290,16 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
public void readerIdle()
{
- // TODO - enforce disconnect on lack of inbound data
+ Subject.doAs(_authorizedSubject, new PrivilegedAction<Object>()
+ {
+ @Override
+ public Object run()
+ {
+ getEventLogger().message(ConnectionMessages.IDLE_CLOSE());
+ _network.close();
+ return null;
+ }
+ });
}
public synchronized void writerIdle()
@@ -1344,7 +1370,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
}
@Override
- public Port getPort()
+ public Port<?> getPort()
{
return _port;
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionSecureOkMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionSecureOkMethodHandler.java
index a2b596e2b1..92552cb011 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionSecureOkMethodHandler.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionSecureOkMethodHandler.java
@@ -33,7 +33,6 @@ import org.apache.qpid.framing.ConnectionSecureOkBody;
import org.apache.qpid.framing.ConnectionTuneBody;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
import org.apache.qpid.server.protocol.v0_8.state.AMQState;
@@ -59,7 +58,7 @@ public class ConnectionSecureOkMethodHandler implements StateAwareMethodListener
public void methodReceived(AMQStateManager stateManager, ConnectionSecureOkBody body, int channelId) throws AMQException
{
- Broker broker = stateManager.getBroker();
+ Broker<?> broker = stateManager.getBroker();
AMQProtocolSession session = stateManager.getProtocolSession();
SubjectCreator subjectCreator = stateManager.getSubjectCreator();
@@ -99,7 +98,7 @@ public class ConnectionSecureOkMethodHandler implements StateAwareMethodListener
ConnectionTuneBody tuneBody =
methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(),
- BrokerProperties.FRAME_SIZE,
+ broker.getContextValue(Long.class, Broker.BROKER_FRAME_SIZE),
broker.getConnection_heartBeatDelay());
session.writeFrame(tuneBody.generateFrame(0));
session.setAuthorizedSubject(authResult.getSubject());
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionStartOkMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionStartOkMethodHandler.java
index dc4f010a66..d6801c0fbc 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionStartOkMethodHandler.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionStartOkMethodHandler.java
@@ -32,7 +32,6 @@ import org.apache.qpid.framing.ConnectionStartOkBody;
import org.apache.qpid.framing.ConnectionTuneBody;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
import org.apache.qpid.server.protocol.v0_8.state.AMQState;
@@ -59,7 +58,7 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener<
public void methodReceived(AMQStateManager stateManager, ConnectionStartOkBody body, int channelId) throws AMQException
{
- Broker broker = stateManager.getBroker();
+ Broker<?> broker = stateManager.getBroker();
AMQProtocolSession session = stateManager.getProtocolSession();
_logger.info("SASL Mechanism selected: " + body.getMechanism());
@@ -113,7 +112,7 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener<
stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
ConnectionTuneBody tuneBody = methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(),
- BrokerProperties.FRAME_SIZE,
+ broker.getContextValue(Long.class,Broker.BROKER_FRAME_SIZE),
broker.getConnection_heartBeatDelay());
session.writeFrame(tuneBody.generateFrame(0));
break;
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionTuneOkMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionTuneOkMethodHandler.java
index 5fddab6576..108c19dbaf 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionTuneOkMethodHandler.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionTuneOkMethodHandler.java
@@ -22,8 +22,11 @@ package org.apache.qpid.server.protocol.v0_8.handler;
import org.apache.log4j.Logger;
+import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.ConnectionTuneOkBody;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
import org.apache.qpid.server.protocol.v0_8.state.AMQState;
import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
@@ -49,8 +52,29 @@ public class ConnectionTuneOkMethodHandler implements StateAwareMethodListener<C
_logger.debug(body);
}
stateManager.changeState(AMQState.CONNECTION_NOT_OPENED);
+
session.initHeartbeats(body.getHeartbeat());
- session.setMaxFrameSize(body.getFrameMax());
+
+ long brokerFrameMax = stateManager.getBroker().getContextValue(Long.class,Broker.BROKER_FRAME_SIZE);
+ if(brokerFrameMax != 0 && body.getFrameMax() > brokerFrameMax)
+ {
+ throw new AMQConnectionException(AMQConstant.SYNTAX_ERROR,
+ "Attempt to set max frame size to " + body.getFrameMax()
+ + "greater than the broker will allow: "
+ + brokerFrameMax,
+ body.getClazz(), body.getMethod(),
+ body.getMajor(), body.getMinor(),null);
+ }
+ else if(body.getFrameMax() > 0 && body.getFrameMax() < AMQConstant.FRAME_MIN_SIZE.getCode())
+ {
+ throw new AMQConnectionException(AMQConstant.SYNTAX_ERROR,
+ "Attempt to set max frame size to " + body.getFrameMax()
+ + "which is smaller than the specification definined minimum: "
+ + AMQConstant.FRAME_MIN_SIZE.getCode(),
+ body.getClazz(), body.getMethod(),
+ body.getMajor(), body.getMinor(),null);
+ }
+ session.setMaxFrameSize(body.getFrameMax()== 0l ? (brokerFrameMax == 0l ? 0xFFFFFFFFl : brokerFrameMax) : body.getFrameMax());
long maxChannelNumber = body.getChannelMax();
//0 means no implied limit, except that forced by protocol limitations (0xFFFF)
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/AMQStateManager.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/AMQStateManager.java
index cb9295ac49..3c1f1dedc3 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/AMQStateManager.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/AMQStateManager.java
@@ -51,12 +51,12 @@ public class AMQStateManager implements AMQMethodListener
{
private static final Logger _logger = Logger.getLogger(AMQStateManager.class);
- private final Broker _broker;
+ private final Broker<?> _broker;
private final AMQProtocolSession _protocolSession;
/** The current state */
private AMQState _currentState;
- public AMQStateManager(Broker broker, AMQProtocolSession protocolSession)
+ public AMQStateManager(Broker<?> broker, AMQProtocolSession protocolSession)
{
_broker = broker;
_protocolSession = protocolSession;
@@ -69,7 +69,7 @@ public class AMQStateManager implements AMQMethodListener
*
* @return the Broker
*/
- public Broker getBroker()
+ public Broker<?> getBroker()
{
return _broker;
}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
index ffa65b2477..2a48ccb2df 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
@@ -62,7 +62,7 @@ import org.apache.qpid.server.virtualhost.VirtualHostImpl;
public class Connection_1_0 implements ConnectionEventListener, AMQConnectionModel<Connection_1_0,Session_1_0>
{
- private final Port _port;
+ private final Port<?> _port;
private final Broker _broker;
private final SubjectCreator _subjectCreator;
private VirtualHostImpl _vhost;
@@ -358,7 +358,7 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod
}
@Override
- public Port getPort()
+ public Port<?> getPort()
{
return _port;
}
diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Broker.js b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Broker.js
index 11a79984b3..8cc3e76b58 100644
--- a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Broker.js
+++ b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Broker.js
@@ -356,20 +356,20 @@ define(["dojo/_base/xhr",
that.vhostsGrid =
new UpdatableStore(that.brokerData.virtualhostnodes, query(".broker-virtualhosts")[0],
[
- { name: "Node Name", field: "name", width: "15%"},
+ { name: "Node Name", field: "name", width: "10%"},
{ name: "Node State", field: "state", width: "10%"},
{ name: "Node Type", field: "type", width: "10%"},
- { name: "Host Name", field: "_item", width: "15%",
+ { name: "Host Name", field: "_item", width: "10%",
formatter: function(item){
return item && item.virtualhosts? item.virtualhosts[0].name: "N/A";
}
},
- { name: "Host State", field: "_item", width: "10%",
+ { name: "Host State", field: "_item", width: "15%",
formatter: function(item){
return item && item.virtualhosts? item.virtualhosts[0].state: "N/A";
}
},
- { name: "Host Type", field: "_item", width: "10%",
+ { name: "Host Type", field: "_item", width: "15%",
formatter: function(item){
return item && item.virtualhosts? item.virtualhosts[0].type: "N/A";
}
diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js
index 025390b9ff..59e49f3302 100644
--- a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js
+++ b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js
@@ -333,7 +333,11 @@ define(["dojo/_base/xhr",
"bytesInRateUnits",
"msgOutRate",
"bytesOutRate",
- "bytesOutRateUnits"]);
+ "bytesOutRateUnits",
+ "queueFlowResumeSizeBytes",
+ "queueFlowControlSizeBytes",
+ "maximumDeliveryAttempts",
+ "oldestMessageAge"]);
@@ -413,6 +417,13 @@ define(["dojo/_base/xhr",
{
this.messageGroups.style.display = "none";
}
+
+ this.queueFlowControlSizeBytes.innerHTML = entities.encode(String(this.queueData[ "queueFlowControlSizeBytes" ]));
+ this.queueFlowResumeSizeBytes.innerHTML = entities.encode(String(this.queueData[ "queueFlowResumeSizeBytes" ]));
+
+ this.oldestMessageAge.innerHTML = entities.encode(String(this.queueData[ "oldestMessageAge" ] / 1000));
+ var maximumDeliveryAttempts = this.queueData[ "maximumDeliveryAttempts" ];
+ this.maximumDeliveryAttempts.innerHTML = entities.encode(String( maximumDeliveryAttempts == 0 ? "" : maximumDeliveryAttempts));
};
QueueUpdater.prototype.update = function()
diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html b/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html
index 52903a80ea..961f60e214 100644
--- a/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html
+++ b/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html
@@ -20,22 +20,81 @@
-->
<div class="queue">
<div data-dojo-type="dijit.TitlePane" data-dojo-props="title: 'Queue Attributes', open: true">
+
<div class="clear">
<div class="formLabel-labelCell">Name:</div>
- <div class="name"></div>
- </div>
- <div class="clear">
- <div class="formLabel-labelCell">State:</div>
- <div class="state"></div>
+ <div class="name formValue-valueCell"></div>
</div>
<div class="clear">
- <div class="formLabel-labelCell">Durable:</div>
- <div class="durable"></div>
+ <div class="alignLeft">
+ <div class="clear">
+ <div class="formLabel-labelCell">Type:</div>
+ <div class="type formValue-valueCell"></div>
+ <div class="typeQualifier formValue-valueCell"></div>
+ </div>
+ <div class="clear">
+ <div class="formLabel-labelCell">State:</div>
+ <div class="state formValue-valueCell"></div>
+ </div>
+ <div class="clear">
+ <div class="formLabel-labelCell">Durable:</div>
+ <div class="durable formValue-valueCell"></div>
+ </div>
+ <div class="clear">
+ <div class="formLabel-labelCell">Lifespan:</div>
+ <div class="lifetimePolicy formValue-valueCell"></div>
+ </div>
+ <div class="clear">
+ <div class="formLabel-labelCell">Persist Messages:</div>
+ <div class="messageDurability formValue-valueCell"></div>
+ </div>
</div>
- <div class="clear">
- <div class="formLabel-labelCell">Persist Messages:</div>
- <div class="messageDurability"></div>
+ <div class="alignRight">
+ <div>
+ <div class="formLabel-labelCell">Inbound:</div>
+ <div class="formValue-valueCell">
+ <span class="msgInRate"></span>
+ <span> msg/s</span>
+ <span class="bytesInRate"></span>
+ <span class="bytesInRateUnits"></span>
+ </div>
+ </div>
+ <div>
+ <div class="formLabel-labelCell">Outbound:</div>
+ <div class="formValue-valueCell">
+ <span class="msgOutRate"></span>
+ <span> msg/s</span>
+ <span class="bytesOutRate"></span>
+ <span class="bytesOutRateUnits"></span>
+ </div>
+ </div>
+ <div>
+ <div class="formLabel-labelCell">Size:</div>
+ <div class="formValue-valueCell">
+ <span class="queueDepthMessages"></span>
+ <span> msgs</span>
+ <span class="queueDepthBytes">(</span>
+ <span class="queueDepthBytesUnits">)</span>
+ </div>
+ </div>
+ <div>
+ <div class="formLabel-labelCell">Pre-fetched:</div>
+ <div class="formValue-valueCell">
+ <span class="unacknowledgedMessages"></span>
+ <span> msgs</span>
+ <span class="unacknowledgedBytes">(</span>
+ <span class="unacknowledgedBytesUnits">)</span>
+ </div>
+ </div>
+ <div>
+ <div class="formLabel-labelCell">Oldest Message Age:</div>
+ <div class="formValue-valueCell">
+ <span class="oldestMessageAge"></span>
+ <span> secs</span>
+ </div>
+ </div>
</div>
+ <div class="clear"></div>
<div class="clear">
<div class="formLabel-labelCell">Enforced Max. Ttl(ms):</div>
<div class="maximumMessageTtl"></div>
@@ -53,55 +112,12 @@
<div class="owner"></div>
</div>
<div class="clear">
- <div class="formLabel-labelCell">Lifespan:</div>
- <div class="lifetimePolicy"></div>
- </div>
- <div class="clear">
- <div class="formLabel-labelCell">Type:</div>
- <div>
- <span class="type"></span>
- <span class="typeQualifier"></span>
- </div>
- </div>
- <div class="clear">
- <div class="formLabel-labelCell">Size:</div>
- <div>
- <span class="queueDepthMessages"></span>
- <span> msgs</span>
- <span class="queueDepthBytes">(</span>
- <span class="queueDepthBytesUnits">)</span>
- </div>
- </div>
- <div class="clear">
- <div class="formLabel-labelCell">Pre-fetched:</div>
- <div>
- <span class="unacknowledgedMessages"></span>
- <span> msgs</span>
- <span class="unacknowledgedBytes">(</span>
- <span class="unacknowledgedBytesUnits">)</span>
- </div>
- </div>
- <div class="clear">
- <div class="formLabel-labelCell">Inbound:</div>
- <div>
- <span class="msgInRate"></span>
- <span> msg/s</span>
- <span class="bytesInRate">(</span>
- <span class="bytesInRateUnits">)</span>
- </div>
+ <div class="formLabel-labelCell">Alternate Exchange:</div>
+ <div class="alternateExchange"></div>
</div>
<div class="clear">
- <div class="formLabel-labelCell">Outbound:</div>
- <div>
- <span class="msgOutRate"></span>
- <span> msg/s</span>
- <span class="bytesOutRate">(</span>
- <span class="bytesOutRateUnits">)</span>
- </div>
- </div>
- <div class="clear">
- <div class="formLabel-labelCell">AlternateExchange:</div>
- <div class="alternateExchange"></div>
+ <div class="formLabel-labelCell">Maximum Delivery Attempts:</div>
+ <div class="maximumDeliveryAttempts"></div>
</div>
<div class="clear messageGroups">
<div class="clear">
@@ -133,6 +149,25 @@
<button data-dojo-type="dijit.form.Button" class="copyMessagesButton" type="button">Copy Messages</button>
</div>
<br/>
+ <div data-dojo-type="dijit.TitlePane" data-dojo-props="title: 'Flow Control Settings', open: false">
+ <div class="clear">
+ <div class="formLabel-labelCell">Capacity:</div>
+ <div>
+ <span class="queueFlowControlSizeBytes"></span>
+ <span>B</span>
+ </div>
+ </div>
+ <div class="clear">
+ <div class="formLabel-labelCell">Resume Capacity:</div>
+ <div>
+ <span class="queueFlowResumeSizeBytes"></span>
+ <span>B</span>
+ </div>
+ </div>
+ <div class="clear"></div>
+ </div>
+
+ <br/>
<div data-dojo-type="dijit.TitlePane" data-dojo-props="title: 'Alerting Thresholds', open: false">
<div class="clear">
<div class="formLabel-labelCell">Queue Depth:</div>