summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-08-25 14:48:20 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-08-25 14:48:20 +0000
commit2a7c8b3061fda47cc53ef997c339599dd2285395 (patch)
tree40334230aa105819bb4e1bc0ea7794e39050c64b
parent717bfa2e17d949bf0771ca14fb15bc99dd41f9fd (diff)
downloadqpid-python-2a7c8b3061fda47cc53ef997c339599dd2285395.tar.gz
Merging from trunk r1616861:1617235 in the Java tree
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.30@1620330 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java16
-rw-r--r--qpid/java/bdbstore/src/main/java/resources/js/qpid/management/virtualhostnode/bdb_ha/show.js2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/BrokerProperties.java4
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java5
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Port.java6
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java2
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java2
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java2
-rwxr-xr-xqpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java43
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java6
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java60
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java60
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionSecureOkMethodHandler.java5
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionStartOkMethodHandler.java5
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionTuneOkMethodHandler.java26
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/AMQStateManager.java6
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java4
-rw-r--r--qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Broker.js8
-rw-r--r--qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js13
-rw-r--r--qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html149
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java10
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java13
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java20
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.java12
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.properties14
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java59
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java38
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java27
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolHeaderException.java11
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java30
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java67
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/FrameSizeObserver.java26
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java25
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java18
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java39
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/Frame.java4
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java26
-rw-r--r--qpid/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java16
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java8
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/transport/MaxFrameSizeTest.java349
41 files changed, 916 insertions, 322 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
index 83a2054793..cacb04736c 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
@@ -71,7 +71,6 @@ import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironment
import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacadeFactory;
import org.apache.qpid.server.store.berkeleydb.replication.ReplicationGroupListener;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
-import org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHost;
import org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHostImpl;
import org.apache.qpid.server.virtualhostnode.AbstractVirtualHostNode;
import org.codehaus.jackson.map.ObjectMapper;
@@ -324,7 +323,13 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
}
finally
{
- stopEnvironment();
+ closeEnvironment();
+
+ // closing the environment does not cause a state change. Adjust the role
+ // so that our observers will see DETACHED rather than our previous role in the group.
+ ReplicatedEnvironment.State detached = ReplicatedEnvironment.State.DETACHED;
+ _lastReplicatedEnvironmentState.set(detached);
+ attributeSet(ROLE, _role, detached);
//Perhaps, having STOPPED operational logging could be sufficient. However, on START we still will be seeing 2 logs: ATTACHED and STARTED
getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.DETACHED(getName(), getGroupName()));
@@ -345,7 +350,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
}
}
- private void stopEnvironment()
+ private void closeEnvironment()
{
ReplicatedEnvironmentFacade environmentFacade = getReplicatedEnvironmentFacade();
if (environmentFacade != null && _environmentFacade.compareAndSet(environmentFacade, null))
@@ -412,7 +417,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
}
finally
{
- stopEnvironment();
+ closeEnvironment();
getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.DETACHED(getName(), getGroupName()));
}
}
@@ -847,6 +852,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
if (nodeState == null)
{
remoteNode.setRole(ReplicatedEnvironment.State.UNKNOWN.name());
+ remoteNode.setLastTransactionId(-1);
if (!remoteNode.isDetached())
{
getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.DETACHED(remoteNode.getName(), getGroupName()));
@@ -909,7 +915,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
}
finally
{
- stopEnvironment();
+ closeEnvironment();
}
notifyStateChanged(state, State.ERRORED);
}
diff --git a/qpid/java/bdbstore/src/main/java/resources/js/qpid/management/virtualhostnode/bdb_ha/show.js b/qpid/java/bdbstore/src/main/java/resources/js/qpid/management/virtualhostnode/bdb_ha/show.js
index f927b81c9d..b5e12a664e 100644
--- a/qpid/java/bdbstore/src/main/java/resources/js/qpid/management/virtualhostnode/bdb_ha/show.js
+++ b/qpid/java/bdbstore/src/main/java/resources/js/qpid/management/virtualhostnode/bdb_ha/show.js
@@ -99,7 +99,7 @@ define(["dojo/_base/xhr",
{ name: 'Role', field: 'role', width: '10%' },
{ name: 'Address', field: 'address', width: '35%' },
{ name: 'Join Time', field: 'joinTime', width: '25%', formatter: function(value){ return value ? UserPreferences.formatDateTime(value) : "";} },
- { name: 'Replication Transaction ID', field: 'lastKnownReplicationTransactionId', width: '20%' }
+ { name: 'Replication Transaction ID', field: 'lastKnownReplicationTransactionId', width: '20%', formatter: function(value){ return value > 0 ? value : "N/A";} }
],
null,
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/BrokerProperties.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/BrokerProperties.java
index fb382a8ca9..ccda1e1fe1 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/BrokerProperties.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/BrokerProperties.java
@@ -40,10 +40,6 @@ public class BrokerProperties
public static final String PROPERTY_DEFAULT_SUPPORTED_PROTOCOL_REPLY = "qpid.broker_default_supported_protocol_version_reply";
public static final String PROPERTY_DISABLED_FEATURES = "qpid.broker_disabled_features";
- private static final int DEFAULT_FRAME_SIZE = 65535;
- public static final String PROPERTY_FRAME_SIZE = "qpid.broker_frame_size";
- public static final int FRAME_SIZE = Integer.getInteger(PROPERTY_FRAME_SIZE, DEFAULT_FRAME_SIZE);
-
public static final String PROPERTY_BROKER_DEFAULT_AMQP_PROTOCOL_EXCLUDES = "qpid.broker_default_amqp_protocol_excludes";
public static final String PROPERTY_BROKER_DEFAULT_AMQP_PROTOCOL_INCLUDES = "qpid.broker_default_amqp_protocol_includes";
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
index 7d49d0b85f..982ebb01c6 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
@@ -78,6 +78,11 @@ public interface Broker<X extends Broker<X>> extends ConfiguredObject<X>, EventL
@ManagedContextDefault(name = BROKER_FLOW_TO_DISK_THRESHOLD)
public static final long DEFAULT_FLOW_TO_DISK_THRESHOLD = (long)(0.4 * (double)Runtime.getRuntime().maxMemory());
+ String BROKER_FRAME_SIZE = "qpid.broker_frame_size";
+ @ManagedContextDefault(name = BROKER_FRAME_SIZE)
+ long DEFAULT_FRAME_SIZE = 65535;
+
+
@DerivedAttribute
String getBuildVersion();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Port.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Port.java
index 8dabd3eed6..e98ff1a79a 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Port.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Port.java
@@ -38,6 +38,12 @@ public interface Port<X extends Port<X>> extends ConfiguredObject<X>
String KEY_STORE = "keyStore";
String TRUST_STORES = "trustStores";
+
+ String CONNECTION_MAXIMUM_AUTHENTICATION_DELAY = "connection.maximumAuthenticationDelay";
+
+ @ManagedContextDefault(name = CONNECTION_MAXIMUM_AUTHENTICATION_DELAY)
+ long DEFAULT_MAX_CONNECTION_AUTHENTICATION_DELAY = 10000l;
+
// Attributes
@ManagedAttribute(defaultValue = "*")
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
index c9045999b9..5041e22104 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
@@ -89,7 +89,7 @@ public interface AMQConnectionModel<T extends AMQConnectionModel<T,S>, S extends
long getLastIoTime();
- Port getPort();
+ Port<?> getPort();
Transport getTransport();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
index fefd04e81d..0eabcd725e 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
@@ -293,7 +293,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
else
{
msgheader.limit(_header.remaining());
- msg.position(_header.remaining());
+ msg.position(msg.position()+_header.remaining());
}
_header.put(msgheader);
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
index e6afbc6e90..f614ff5847 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
@@ -584,7 +584,7 @@ public class MockConsumer implements ConsumerTarget
}
@Override
- public Port getPort()
+ public Port<?> getPort()
{
return null;
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
index cefd1ee0b2..dc60a37a7f 100755
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
@@ -20,27 +20,32 @@
*/
package org.apache.qpid.server.protocol.v0_10;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+
+import javax.security.auth.Subject;
+
+import org.apache.log4j.Logger;
+
import org.apache.qpid.protocol.ServerProtocolEngine;
-import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Transport;
+import org.apache.qpid.transport.Constant;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.network.Assembler;
import org.apache.qpid.transport.network.Disassembler;
import org.apache.qpid.transport.network.InputHandler;
import org.apache.qpid.transport.network.NetworkConnection;
-import javax.security.auth.Subject;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.security.AccessController;
-import java.security.PrivilegedAction;
-
public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocolEngine
{
public static final int MAX_FRAME_SIZE = 64 * 1024 - 1;
+ private static final Logger _logger = Logger.getLogger(ProtocolEngine_0_10.class);
+
private NetworkConnection _network;
private long _readBytes;
@@ -87,7 +92,9 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
_network = network;
_connection.setNetworkConnection(network);
- _connection.setSender(new Disassembler(wrapSender(sender), MAX_FRAME_SIZE));
+ Disassembler disassembler = new Disassembler(wrapSender(sender), Constant.MIN_MAX_FRAME_SIZE);
+ _connection.setSender(disassembler);
+ _connection.addFrameSizeObserver(disassembler);
// FIXME Two log messages to maintain compatibility with earlier protocol versions
_connection.getEventLogger().message(ConnectionMessages.OPEN(null, "0-10", null, null, false, true, false, false));
@@ -154,6 +161,26 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
public void received(final ByteBuffer buf)
{
_lastReadTime = System.currentTimeMillis();
+ if(_connection.getAuthorizedPrincipal() == null &&
+ (_lastReadTime - _createTime) > _connection.getPort().getContextValue(Long.class,
+ Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY) )
+ {
+ Subject.doAs(_connection.getAuthorizedSubject(), new PrivilegedAction<Object>()
+ {
+ @Override
+ public Object run()
+ {
+
+ _logger.warn("Connection has taken more than "
+ + _connection.getPort()
+ .getContextValue(Long.class, Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY)
+ + "ms to establish identity. Closing as possible DoS.");
+ _connection.getEventLogger().message(ConnectionMessages.IDLE_CLOSE());
+ _network.close();
+ return null;
+ }
+ });
+ }
super.received(buf);
_connection.receivedComplete();
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
index 2ad79ad980..8ddd04f51a 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
@@ -75,7 +75,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
private final long _connectionId;
private final Object _reference = new Object();
private VirtualHostImpl _virtualHost;
- private Port _port;
+ private Port<?> _port;
private AtomicLong _lastIoTime = new AtomicLong();
private boolean _blocking;
private Transport _transport;
@@ -189,12 +189,12 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
}
@Override
- public Port getPort()
+ public Port<?> getPort()
{
return _port;
}
- public void setPort(Port port)
+ public void setPort(Port<?> port)
{
_port = port;
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
index 7751ff765d..bab2d802e8 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
@@ -50,21 +50,7 @@ import org.apache.qpid.server.security.SubjectCreator;
import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus;
import org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
-import org.apache.qpid.transport.Binary;
-import org.apache.qpid.transport.Connection;
-import org.apache.qpid.transport.ConnectionClose;
-import org.apache.qpid.transport.ConnectionCloseCode;
-import org.apache.qpid.transport.ConnectionOpen;
-import org.apache.qpid.transport.ConnectionOpenOk;
-import org.apache.qpid.transport.ConnectionStartOk;
-import org.apache.qpid.transport.ConnectionTuneOk;
-import org.apache.qpid.transport.ServerDelegate;
-import org.apache.qpid.transport.Session;
-import org.apache.qpid.transport.SessionAttach;
-import org.apache.qpid.transport.SessionDelegate;
-import org.apache.qpid.transport.SessionDetach;
-import org.apache.qpid.transport.SessionDetachCode;
-import org.apache.qpid.transport.SessionDetached;
+import org.apache.qpid.transport.*;
import org.apache.qpid.transport.network.NetworkConnection;
public class ServerConnectionDelegate extends ServerDelegate
@@ -76,15 +62,16 @@ public class ServerConnectionDelegate extends ServerDelegate
private int _maxNoOfChannels;
private Map<String,Object> _clientProperties;
private final SubjectCreator _subjectCreator;
+ private int _maximumFrameSize;
- public ServerConnectionDelegate(Broker broker, String localFQDN, SubjectCreator subjectCreator)
+ public ServerConnectionDelegate(Broker<?> broker, String localFQDN, SubjectCreator subjectCreator)
{
this(createConnectionProperties(broker), Collections.singletonList((Object)"en_US"), broker, localFQDN, subjectCreator);
}
private ServerConnectionDelegate(Map<String, Object> properties,
List<Object> locales,
- Broker broker,
+ Broker<?> broker,
String localFQDN,
SubjectCreator subjectCreator)
{
@@ -94,9 +81,10 @@ public class ServerConnectionDelegate extends ServerDelegate
_localFQDN = localFQDN;
_maxNoOfChannels = broker.getConnection_sessionCountLimit();
_subjectCreator = subjectCreator;
+ _maximumFrameSize = (int) Math.min(0xffffl, broker.getContextValue(Long.class, Broker.BROKER_FRAME_SIZE));
}
- private static List<String> getFeatures(Broker broker)
+ private static List<String> getFeatures(Broker<?> broker)
{
String brokerDisabledFeatures = System.getProperty(BrokerProperties.PROPERTY_DISABLED_FEATURES);
final List<String> features = new ArrayList<String>();
@@ -108,7 +96,7 @@ public class ServerConnectionDelegate extends ServerDelegate
return Collections.unmodifiableList(features);
}
- private static Map<String, Object> createConnectionProperties(final Broker broker)
+ private static Map<String, Object> createConnectionProperties(final Broker<?> broker)
{
final Map<String,Object> map = new HashMap<String,Object>();
// Federation tag is used by the client to identify the broker instance
@@ -234,6 +222,7 @@ public class ServerConnectionDelegate extends ServerDelegate
{
ServerConnection sconn = (ServerConnection) conn;
int okChannelMax = ok.getChannelMax();
+ int okMaxFrameSize = ok.getMaxFrameSize();
if (okChannelMax > getChannelMax())
{
@@ -246,6 +235,31 @@ public class ServerConnectionDelegate extends ServerDelegate
return;
}
+ if(okMaxFrameSize > getFrameMax())
+ {
+ LOGGER.error("Connection '" + sconn.getConnectionId() + "' being severed, " +
+ "client connectionTuneOk returned a frameMax (" + okMaxFrameSize +
+ ") above the server's offered limit (" + getFrameMax() +")");
+
+ //Due to the error we must forcefully close the connection without negotiation
+ sconn.getSender().close();
+ return;
+ }
+ else if(okMaxFrameSize > 0 && okMaxFrameSize < Constant.MIN_MAX_FRAME_SIZE)
+ {
+ LOGGER.error("Connection '" + sconn.getConnectionId() + "' being severed, " +
+ "client connectionTuneOk returned a frameMax (" + okMaxFrameSize +
+ ") below the minimum permitted size (" + Constant.MIN_MAX_FRAME_SIZE +")");
+
+ //Due to the error we must forcefully close the connection without negotiation
+ sconn.getSender().close();
+ return;
+ }
+ else if(okMaxFrameSize == 0)
+ {
+ okMaxFrameSize = getFrameMax();
+ }
+
final NetworkConnection networkConnection = sconn.getNetworkConnection();
if(ok.hasHeartbeat())
{
@@ -266,6 +280,8 @@ public class ServerConnectionDelegate extends ServerDelegate
}
setConnectionTuneOkChannelMax(sconn, okChannelMax);
+
+ conn.setMaxFrameSize(okMaxFrameSize);
}
@Override
@@ -279,6 +295,12 @@ public class ServerConnectionDelegate extends ServerDelegate
_maxNoOfChannels = channelMax;
}
+ @Override
+ protected int getFrameMax()
+ {
+ return _maximumFrameSize;
+ }
+
@Override public void sessionDetach(Connection conn, SessionDetach dtc)
{
// To ensure a clean detach, we stop any remaining subscriptions. Stop ensures
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
index 46a9430814..1c264e52c6 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
@@ -44,37 +44,38 @@ import javax.security.auth.Subject;
import javax.security.sasl.SaslServer;
import org.apache.log4j.Logger;
+
import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.*;
-import org.apache.qpid.codec.AMQCodecFactory;
+import org.apache.qpid.codec.AMQDecoder;
import org.apache.qpid.common.QpidProperties;
import org.apache.qpid.common.ServerPropertyNames;
+import org.apache.qpid.framing.*;
import org.apache.qpid.properties.ConnectionStartProperties;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.connection.ConnectionPrincipal;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.logging.EventLogger;
-import org.apache.qpid.server.message.InstanceProperties;
-import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.configuration.BrokerProperties;
-import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.protocol.SessionModelListener;
-import org.apache.qpid.server.protocol.v0_8.handler.ServerMethodDispatcherImpl;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.logging.subjects.ConnectionLogSubject;
+import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Transport;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.SessionModelListener;
+import org.apache.qpid.server.protocol.v0_8.handler.ServerMethodDispatcherImpl;
import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverterRegistry;
-import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.protocol.v0_8.state.AMQState;
import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
+import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
@@ -93,7 +94,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
// channels. This value must be of the form 2^x - 1.
private static final int CHANNEL_CACHE_SIZE = 0xff;
private static final int REUSABLE_BYTE_BUFFER_CAPACITY = 65 * 1024;
- private final Port _port;
+ private final Port<?> _port;
+ private final long _creationTime;
private AMQShortString _contextKey;
@@ -123,7 +125,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
private final AMQStateManager _stateManager;
- private AMQCodecFactory _codecFactory;
+ private AMQDecoder _decoder;
private SaslServer _saslServer;
@@ -166,12 +168,13 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
private final ReentrantLock _receivedLock;
private AtomicLong _lastWriteTime = new AtomicLong(System.currentTimeMillis());
- private final Broker _broker;
+ private final Broker<?> _broker;
private final Transport _transport;
private volatile boolean _closeWhenNoRoute;
private volatile boolean _stopped;
private long _readBytes;
+ private boolean _authenticated;
public AMQProtocolEngine(Broker broker,
final NetworkConnection network,
@@ -185,7 +188,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
_maxNoOfChannels = broker.getConnection_sessionCountLimit();
_receivedLock = new ReentrantLock();
_stateManager = new AMQStateManager(broker, this);
- _codecFactory = new AMQCodecFactory(true, this);
+ _decoder = new AMQDecoder(true, this);
_connectionID = connectionId;
_logSubject = new ConnectionLogSubject(this);
@@ -210,6 +213,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
_dataDelivered = new StatisticsCounter("data-delivered-" + getSessionID());
_messagesReceived = new StatisticsCounter("messages-received-" + getSessionID());
_dataReceived = new StatisticsCounter("data-received-" + getSessionID());
+ _creationTime = System.currentTimeMillis();
}
private <T> T runAsSubject(PrivilegedAction<T> action)
@@ -247,6 +251,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
public void setMaxFrameSize(long frameMax)
{
_maxFrameSize = frameMax;
+ _decoder.setMaxFrameSize(frameMax);
}
public long getMaxFrameSize()
@@ -277,7 +282,18 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
@Override
public Void run()
{
+
final long arrivalTime = System.currentTimeMillis();
+ if(!_authenticated &&
+ (arrivalTime - _creationTime) > _port.getContextValue(Long.class,
+ Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY))
+ {
+ _logger.warn("Connection has taken more than "
+ + _port.getContextValue(Long.class, Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY)
+ + "ms to establish identity. Closing as possible DoS.");
+ getEventLogger().message(ConnectionMessages.IDLE_CLOSE());
+ closeProtocolSession();
+ }
_lastReceivedTime = arrivalTime;
_lastIoTime = arrivalTime;
_readBytes += msg.remaining();
@@ -285,7 +301,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
_receivedLock.lock();
try
{
- final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
+ final ArrayList<AMQDataBlock> dataBlocks = _decoder.decodeBuffer(msg);
for (AMQDataBlock dataBlock : dataBlocks)
{
try
@@ -479,7 +495,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
private synchronized void protocolInitiationReceived(ProtocolInitiation pi)
{
// this ensures the codec never checks for a PI message again
- (_codecFactory.getDecoder()).setExpectProtocolInitiation(false);
+ _decoder.setExpectProtocolInitiation(false);
try
{
// Log incoming protocol negotiation request
@@ -1200,6 +1216,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
throw new IllegalArgumentException("authorizedSubject cannot be null");
}
+ _authenticated = true;
_authorizedSubject.getPrincipals().addAll(authorizedSubject.getPrincipals());
_authorizedSubject.getPrivateCredentials().addAll(authorizedSubject.getPrivateCredentials());
_authorizedSubject.getPublicCredentials().addAll(authorizedSubject.getPublicCredentials());
@@ -1273,7 +1290,16 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
public void readerIdle()
{
- // TODO - enforce disconnect on lack of inbound data
+ Subject.doAs(_authorizedSubject, new PrivilegedAction<Object>()
+ {
+ @Override
+ public Object run()
+ {
+ getEventLogger().message(ConnectionMessages.IDLE_CLOSE());
+ _network.close();
+ return null;
+ }
+ });
}
public synchronized void writerIdle()
@@ -1344,7 +1370,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
}
@Override
- public Port getPort()
+ public Port<?> getPort()
{
return _port;
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionSecureOkMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionSecureOkMethodHandler.java
index a2b596e2b1..92552cb011 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionSecureOkMethodHandler.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionSecureOkMethodHandler.java
@@ -33,7 +33,6 @@ import org.apache.qpid.framing.ConnectionSecureOkBody;
import org.apache.qpid.framing.ConnectionTuneBody;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
import org.apache.qpid.server.protocol.v0_8.state.AMQState;
@@ -59,7 +58,7 @@ public class ConnectionSecureOkMethodHandler implements StateAwareMethodListener
public void methodReceived(AMQStateManager stateManager, ConnectionSecureOkBody body, int channelId) throws AMQException
{
- Broker broker = stateManager.getBroker();
+ Broker<?> broker = stateManager.getBroker();
AMQProtocolSession session = stateManager.getProtocolSession();
SubjectCreator subjectCreator = stateManager.getSubjectCreator();
@@ -99,7 +98,7 @@ public class ConnectionSecureOkMethodHandler implements StateAwareMethodListener
ConnectionTuneBody tuneBody =
methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(),
- BrokerProperties.FRAME_SIZE,
+ broker.getContextValue(Long.class, Broker.BROKER_FRAME_SIZE),
broker.getConnection_heartBeatDelay());
session.writeFrame(tuneBody.generateFrame(0));
session.setAuthorizedSubject(authResult.getSubject());
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionStartOkMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionStartOkMethodHandler.java
index dc4f010a66..d6801c0fbc 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionStartOkMethodHandler.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionStartOkMethodHandler.java
@@ -32,7 +32,6 @@ import org.apache.qpid.framing.ConnectionStartOkBody;
import org.apache.qpid.framing.ConnectionTuneBody;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
import org.apache.qpid.server.protocol.v0_8.state.AMQState;
@@ -59,7 +58,7 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener<
public void methodReceived(AMQStateManager stateManager, ConnectionStartOkBody body, int channelId) throws AMQException
{
- Broker broker = stateManager.getBroker();
+ Broker<?> broker = stateManager.getBroker();
AMQProtocolSession session = stateManager.getProtocolSession();
_logger.info("SASL Mechanism selected: " + body.getMechanism());
@@ -113,7 +112,7 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener<
stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
ConnectionTuneBody tuneBody = methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(),
- BrokerProperties.FRAME_SIZE,
+ broker.getContextValue(Long.class,Broker.BROKER_FRAME_SIZE),
broker.getConnection_heartBeatDelay());
session.writeFrame(tuneBody.generateFrame(0));
break;
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionTuneOkMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionTuneOkMethodHandler.java
index 5fddab6576..108c19dbaf 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionTuneOkMethodHandler.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionTuneOkMethodHandler.java
@@ -22,8 +22,11 @@ package org.apache.qpid.server.protocol.v0_8.handler;
import org.apache.log4j.Logger;
+import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.ConnectionTuneOkBody;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
import org.apache.qpid.server.protocol.v0_8.state.AMQState;
import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
@@ -49,8 +52,29 @@ public class ConnectionTuneOkMethodHandler implements StateAwareMethodListener<C
_logger.debug(body);
}
stateManager.changeState(AMQState.CONNECTION_NOT_OPENED);
+
session.initHeartbeats(body.getHeartbeat());
- session.setMaxFrameSize(body.getFrameMax());
+
+ long brokerFrameMax = stateManager.getBroker().getContextValue(Long.class,Broker.BROKER_FRAME_SIZE);
+ if(brokerFrameMax != 0 && body.getFrameMax() > brokerFrameMax)
+ {
+ throw new AMQConnectionException(AMQConstant.SYNTAX_ERROR,
+ "Attempt to set max frame size to " + body.getFrameMax()
+ + "greater than the broker will allow: "
+ + brokerFrameMax,
+ body.getClazz(), body.getMethod(),
+ body.getMajor(), body.getMinor(),null);
+ }
+ else if(body.getFrameMax() > 0 && body.getFrameMax() < AMQConstant.FRAME_MIN_SIZE.getCode())
+ {
+ throw new AMQConnectionException(AMQConstant.SYNTAX_ERROR,
+ "Attempt to set max frame size to " + body.getFrameMax()
+ + "which is smaller than the specification definined minimum: "
+ + AMQConstant.FRAME_MIN_SIZE.getCode(),
+ body.getClazz(), body.getMethod(),
+ body.getMajor(), body.getMinor(),null);
+ }
+ session.setMaxFrameSize(body.getFrameMax()== 0l ? (brokerFrameMax == 0l ? 0xFFFFFFFFl : brokerFrameMax) : body.getFrameMax());
long maxChannelNumber = body.getChannelMax();
//0 means no implied limit, except that forced by protocol limitations (0xFFFF)
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/AMQStateManager.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/AMQStateManager.java
index cb9295ac49..3c1f1dedc3 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/AMQStateManager.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/AMQStateManager.java
@@ -51,12 +51,12 @@ public class AMQStateManager implements AMQMethodListener
{
private static final Logger _logger = Logger.getLogger(AMQStateManager.class);
- private final Broker _broker;
+ private final Broker<?> _broker;
private final AMQProtocolSession _protocolSession;
/** The current state */
private AMQState _currentState;
- public AMQStateManager(Broker broker, AMQProtocolSession protocolSession)
+ public AMQStateManager(Broker<?> broker, AMQProtocolSession protocolSession)
{
_broker = broker;
_protocolSession = protocolSession;
@@ -69,7 +69,7 @@ public class AMQStateManager implements AMQMethodListener
*
* @return the Broker
*/
- public Broker getBroker()
+ public Broker<?> getBroker()
{
return _broker;
}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
index ffa65b2477..2a48ccb2df 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
@@ -62,7 +62,7 @@ import org.apache.qpid.server.virtualhost.VirtualHostImpl;
public class Connection_1_0 implements ConnectionEventListener, AMQConnectionModel<Connection_1_0,Session_1_0>
{
- private final Port _port;
+ private final Port<?> _port;
private final Broker _broker;
private final SubjectCreator _subjectCreator;
private VirtualHostImpl _vhost;
@@ -358,7 +358,7 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod
}
@Override
- public Port getPort()
+ public Port<?> getPort()
{
return _port;
}
diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Broker.js b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Broker.js
index 11a79984b3..8cc3e76b58 100644
--- a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Broker.js
+++ b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Broker.js
@@ -356,20 +356,20 @@ define(["dojo/_base/xhr",
that.vhostsGrid =
new UpdatableStore(that.brokerData.virtualhostnodes, query(".broker-virtualhosts")[0],
[
- { name: "Node Name", field: "name", width: "15%"},
+ { name: "Node Name", field: "name", width: "10%"},
{ name: "Node State", field: "state", width: "10%"},
{ name: "Node Type", field: "type", width: "10%"},
- { name: "Host Name", field: "_item", width: "15%",
+ { name: "Host Name", field: "_item", width: "10%",
formatter: function(item){
return item && item.virtualhosts? item.virtualhosts[0].name: "N/A";
}
},
- { name: "Host State", field: "_item", width: "10%",
+ { name: "Host State", field: "_item", width: "15%",
formatter: function(item){
return item && item.virtualhosts? item.virtualhosts[0].state: "N/A";
}
},
- { name: "Host Type", field: "_item", width: "10%",
+ { name: "Host Type", field: "_item", width: "15%",
formatter: function(item){
return item && item.virtualhosts? item.virtualhosts[0].type: "N/A";
}
diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js
index 025390b9ff..59e49f3302 100644
--- a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js
+++ b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js
@@ -333,7 +333,11 @@ define(["dojo/_base/xhr",
"bytesInRateUnits",
"msgOutRate",
"bytesOutRate",
- "bytesOutRateUnits"]);
+ "bytesOutRateUnits",
+ "queueFlowResumeSizeBytes",
+ "queueFlowControlSizeBytes",
+ "maximumDeliveryAttempts",
+ "oldestMessageAge"]);
@@ -413,6 +417,13 @@ define(["dojo/_base/xhr",
{
this.messageGroups.style.display = "none";
}
+
+ this.queueFlowControlSizeBytes.innerHTML = entities.encode(String(this.queueData[ "queueFlowControlSizeBytes" ]));
+ this.queueFlowResumeSizeBytes.innerHTML = entities.encode(String(this.queueData[ "queueFlowResumeSizeBytes" ]));
+
+ this.oldestMessageAge.innerHTML = entities.encode(String(this.queueData[ "oldestMessageAge" ] / 1000));
+ var maximumDeliveryAttempts = this.queueData[ "maximumDeliveryAttempts" ];
+ this.maximumDeliveryAttempts.innerHTML = entities.encode(String( maximumDeliveryAttempts == 0 ? "" : maximumDeliveryAttempts));
};
QueueUpdater.prototype.update = function()
diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html b/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html
index 52903a80ea..961f60e214 100644
--- a/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html
+++ b/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html
@@ -20,22 +20,81 @@
-->
<div class="queue">
<div data-dojo-type="dijit.TitlePane" data-dojo-props="title: 'Queue Attributes', open: true">
+
<div class="clear">
<div class="formLabel-labelCell">Name:</div>
- <div class="name"></div>
- </div>
- <div class="clear">
- <div class="formLabel-labelCell">State:</div>
- <div class="state"></div>
+ <div class="name formValue-valueCell"></div>
</div>
<div class="clear">
- <div class="formLabel-labelCell">Durable:</div>
- <div class="durable"></div>
+ <div class="alignLeft">
+ <div class="clear">
+ <div class="formLabel-labelCell">Type:</div>
+ <div class="type formValue-valueCell"></div>
+ <div class="typeQualifier formValue-valueCell"></div>
+ </div>
+ <div class="clear">
+ <div class="formLabel-labelCell">State:</div>
+ <div class="state formValue-valueCell"></div>
+ </div>
+ <div class="clear">
+ <div class="formLabel-labelCell">Durable:</div>
+ <div class="durable formValue-valueCell"></div>
+ </div>
+ <div class="clear">
+ <div class="formLabel-labelCell">Lifespan:</div>
+ <div class="lifetimePolicy formValue-valueCell"></div>
+ </div>
+ <div class="clear">
+ <div class="formLabel-labelCell">Persist Messages:</div>
+ <div class="messageDurability formValue-valueCell"></div>
+ </div>
</div>
- <div class="clear">
- <div class="formLabel-labelCell">Persist Messages:</div>
- <div class="messageDurability"></div>
+ <div class="alignRight">
+ <div>
+ <div class="formLabel-labelCell">Inbound:</div>
+ <div class="formValue-valueCell">
+ <span class="msgInRate"></span>
+ <span> msg/s</span>
+ <span class="bytesInRate"></span>
+ <span class="bytesInRateUnits"></span>
+ </div>
+ </div>
+ <div>
+ <div class="formLabel-labelCell">Outbound:</div>
+ <div class="formValue-valueCell">
+ <span class="msgOutRate"></span>
+ <span> msg/s</span>
+ <span class="bytesOutRate"></span>
+ <span class="bytesOutRateUnits"></span>
+ </div>
+ </div>
+ <div>
+ <div class="formLabel-labelCell">Size:</div>
+ <div class="formValue-valueCell">
+ <span class="queueDepthMessages"></span>
+ <span> msgs</span>
+ <span class="queueDepthBytes">(</span>
+ <span class="queueDepthBytesUnits">)</span>
+ </div>
+ </div>
+ <div>
+ <div class="formLabel-labelCell">Pre-fetched:</div>
+ <div class="formValue-valueCell">
+ <span class="unacknowledgedMessages"></span>
+ <span> msgs</span>
+ <span class="unacknowledgedBytes">(</span>
+ <span class="unacknowledgedBytesUnits">)</span>
+ </div>
+ </div>
+ <div>
+ <div class="formLabel-labelCell">Oldest Message Age:</div>
+ <div class="formValue-valueCell">
+ <span class="oldestMessageAge"></span>
+ <span> secs</span>
+ </div>
+ </div>
</div>
+ <div class="clear"></div>
<div class="clear">
<div class="formLabel-labelCell">Enforced Max. Ttl(ms):</div>
<div class="maximumMessageTtl"></div>
@@ -53,55 +112,12 @@
<div class="owner"></div>
</div>
<div class="clear">
- <div class="formLabel-labelCell">Lifespan:</div>
- <div class="lifetimePolicy"></div>
- </div>
- <div class="clear">
- <div class="formLabel-labelCell">Type:</div>
- <div>
- <span class="type"></span>
- <span class="typeQualifier"></span>
- </div>
- </div>
- <div class="clear">
- <div class="formLabel-labelCell">Size:</div>
- <div>
- <span class="queueDepthMessages"></span>
- <span> msgs</span>
- <span class="queueDepthBytes">(</span>
- <span class="queueDepthBytesUnits">)</span>
- </div>
- </div>
- <div class="clear">
- <div class="formLabel-labelCell">Pre-fetched:</div>
- <div>
- <span class="unacknowledgedMessages"></span>
- <span> msgs</span>
- <span class="unacknowledgedBytes">(</span>
- <span class="unacknowledgedBytesUnits">)</span>
- </div>
- </div>
- <div class="clear">
- <div class="formLabel-labelCell">Inbound:</div>
- <div>
- <span class="msgInRate"></span>
- <span> msg/s</span>
- <span class="bytesInRate">(</span>
- <span class="bytesInRateUnits">)</span>
- </div>
+ <div class="formLabel-labelCell">Alternate Exchange:</div>
+ <div class="alternateExchange"></div>
</div>
<div class="clear">
- <div class="formLabel-labelCell">Outbound:</div>
- <div>
- <span class="msgOutRate"></span>
- <span> msg/s</span>
- <span class="bytesOutRate">(</span>
- <span class="bytesOutRateUnits">)</span>
- </div>
- </div>
- <div class="clear">
- <div class="formLabel-labelCell">AlternateExchange:</div>
- <div class="alternateExchange"></div>
+ <div class="formLabel-labelCell">Maximum Delivery Attempts:</div>
+ <div class="maximumDeliveryAttempts"></div>
</div>
<div class="clear messageGroups">
<div class="clear">
@@ -133,6 +149,25 @@
<button data-dojo-type="dijit.form.Button" class="copyMessagesButton" type="button">Copy Messages</button>
</div>
<br/>
+ <div data-dojo-type="dijit.TitlePane" data-dojo-props="title: 'Flow Control Settings', open: false">
+ <div class="clear">
+ <div class="formLabel-labelCell">Capacity:</div>
+ <div>
+ <span class="queueFlowControlSizeBytes"></span>
+ <span>B</span>
+ </div>
+ </div>
+ <div class="clear">
+ <div class="formLabel-labelCell">Resume Capacity:</div>
+ <div>
+ <span class="queueFlowResumeSizeBytes"></span>
+ <span>B</span>
+ </div>
+ </div>
+ <div class="clear"></div>
+ </div>
+
+ <br/>
<div data-dojo-type="dijit.TitlePane" data-dojo-props="title: 'Alerting Thresholds', open: false">
<div class="clear">
<div class="formLabel-labelCell">Queue Depth:</div>
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java
index e1a0e18262..d76fdf25e6 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java
@@ -20,6 +20,9 @@
*/
package org.apache.qpid.client.handler;
+import java.util.HashMap;
+import java.util.Map;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -29,9 +32,6 @@ import org.apache.qpid.client.state.AMQMethodNotImplementedException;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.framing.*;
-import java.util.HashMap;
-import java.util.Map;
-
public class ClientMethodDispatcherImpl implements MethodDispatcher
{
@@ -101,6 +101,10 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher
}
DispatcherFactory factory = _dispatcherFactories.get(version);
+ if(factory == null)
+ {
+ throw new UnsupportedOperationException("The protocol version " + version + " is not supported");
+ }
return factory.createMethodDispatcher(session);
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java
index 617380e149..1f2df2026b 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java
@@ -72,6 +72,8 @@ public class ConnectionTuneMethodHandler implements StateAwareMethodListener<Con
ConnectionTuneOkBody tuneOkBody = methodRegistry.createConnectionTuneOkBody(params.getChannelMax(),
params.getFrameMax(),
params.getHeartbeat());
+
+ session.setMaxFrameSize(params.getFrameMax());
// Be aware of possible changes to parameter order as versions change.
session.writeFrame(tuneOkBody.generateFrame(channelId));
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index c8ebb7f9c7..08f05cc8d6 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -38,7 +38,7 @@ import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateWaiter;
import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
-import org.apache.qpid.codec.AMQCodecFactory;
+import org.apache.qpid.codec.AMQDecoder;
import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.framing.AMQBody;
import org.apache.qpid.framing.AMQDataBlock;
@@ -165,7 +165,7 @@ public class AMQProtocolHandler implements ProtocolEngine
/** Object to lock on when changing the latch */
private Object _failoverLatchChange = new Object();
- private AMQCodecFactory _codecFactory;
+ private AMQDecoder _decoder;
private ProtocolVersion _suggestedProtocolVersion;
@@ -188,7 +188,7 @@ public class AMQProtocolHandler implements ProtocolEngine
_connection = con;
_protocolSession = new AMQProtocolSession(this, _connection);
_stateManager = new AMQStateManager(_protocolSession);
- _codecFactory = new AMQCodecFactory(false, _protocolSession);
+ _decoder = new AMQDecoder(false, _protocolSession);
_failoverHandler = new FailoverHandler(this);
}
@@ -443,7 +443,7 @@ public class AMQProtocolHandler implements ProtocolEngine
_lastReadTime = System.currentTimeMillis();
try
{
- final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
+ final ArrayList<AMQDataBlock> dataBlocks = _decoder.decodeBuffer(msg);
// Decode buffer
int size = dataBlocks.size();
@@ -927,4 +927,9 @@ public class AMQProtocolHandler implements ProtocolEngine
{
_heartbeatListener.heartbeatReceived();
}
+
+ public void setMaxFrameSize(final long frameMax)
+ {
+ _decoder.setMaxFrameSize(frameMax == 0l ? 0xffffffffl : frameMax);
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
index 121715d439..2c69aa1b51 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -20,8 +20,16 @@
*/
package org.apache.qpid.client.protocol;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import javax.jms.JMSException;
+import javax.security.sasl.SaslClient;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQSession;
@@ -47,13 +55,6 @@ import org.apache.qpid.transport.ConnectionSettings;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.TransportException;
-import javax.jms.JMSException;
-import javax.security.sasl.SaslClient;
-
-import java.nio.ByteBuffer;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
/**
* Wrapper for protocol session that provides type-safe access to session attributes.
* <p>
@@ -543,4 +544,9 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
{
return _connectionStartServerProperties;
}
+
+ public void setMaxFrameSize(final long frameMax)
+ {
+ _protocolHandler.setMaxFrameSize(frameMax);
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.java b/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.java
index 6f99e53055..8d53438bb7 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.java
@@ -20,11 +20,6 @@
*/
package org.apache.qpid.client.security;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.util.FileUtils;
-
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
@@ -39,6 +34,11 @@ import java.util.Set;
import java.util.StringTokenizer;
import java.util.TreeMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.util.FileUtils;
+
/**
* CallbackHandlerRegistry is a registry for call back handlers for user authentication and interaction during user
* authentication. It is capable of reading its configuration from a properties file containing call back handler
@@ -75,7 +75,7 @@ public class CallbackHandlerRegistry
/** Ordered collection of mechanisms for which callback handlers exist. */
private Collection<String> _mechanisms;
- private static final Collection<String> MECHS_THAT_NEED_USERPASS = Arrays.asList(new String [] {"PLAIN", "AMQPLAIN", "CRAM-MD5","CRAM-MD5-HASHED"});
+ private static final Collection<String> MECHS_THAT_NEED_USERPASS = Arrays.asList(new String [] {"PLAIN", "AMQPLAIN", "CRAM-MD5","CRAM-MD5-HASHED", "SCRAM-SHA-1", "SCRAM-SHA-256"});
static
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.properties b/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.properties
index 8f02ee2c38..b77fa033d6 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.properties
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.properties
@@ -26,11 +26,11 @@
EXTERNAL.1=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
GSSAPI.2=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
-CRAM-MD5-HASHED.3=org.apache.qpid.client.security.UsernameHashedPasswordCallbackHandler
-CRAM-MD5.4=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
-AMQPLAIN.5=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
-PLAIN.6=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
-ANONYMOUS.7=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
-SCRAM-SHA-1.8=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
-SCRAM-SHA-256.9=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
+SCRAM-SHA-256.3=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
+SCRAM-SHA-1.4=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
+CRAM-MD5-HASHED.5=org.apache.qpid.client.security.UsernameHashedPasswordCallbackHandler
+CRAM-MD5.6=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
+PLAIN.7=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
+AMQPLAIN.8=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
+ANONYMOUS.9=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java
deleted file mode 100644
index 220e33724a..0000000000
--- a/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.codec;
-
-import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
-
-/**
- * AMQCodecFactory is a Mina codec factory. It supplies the encoders and decoders need to read and write the bytes to
- * the wire.
- */
-public class AMQCodecFactory
-{
-
- /** Holds the protocol decoder. */
- private final AMQDecoder _frameDecoder;
-
- /**
- * Creates a new codec factory, specifiying whether it is expected that the first frame of data should be an
- * initiation. This is the case for the broker, which always expects to received the protocol initiation on a newly
- * connected client.
- *
- * @param expectProtocolInitiation <tt>true</tt> if the first frame received is going to be a protocol initiation
- * frame, <tt>false</tt> if it is going to be a standard AMQ data block.
- * @param session protocol session (connection)
- */
- public AMQCodecFactory(boolean expectProtocolInitiation, AMQVersionAwareProtocolSession session)
- {
- _frameDecoder = new AMQDecoder(expectProtocolInitiation, session);
- }
-
-
- /**
- * Gets the AMQP decoder.
- *
- * @return The AMQP decoder.
- */
- public AMQDecoder getDecoder()
- {
- return _frameDecoder;
- }
-}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java b/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
index 3ccd7e2031..ebecb7b483 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
@@ -20,6 +20,16 @@
*/
package org.apache.qpid.codec;
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ListIterator;
+
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQDataBlockDecoder;
import org.apache.qpid.framing.AMQFrameDecodingException;
@@ -31,16 +41,6 @@ import org.apache.qpid.framing.EncodingUtils;
import org.apache.qpid.framing.ProtocolInitiation;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
-import java.io.ByteArrayInputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.ListIterator;
-
/**
* AMQDecoder delegates the decoding of AMQP either to a data block decoder, or in the case of new connections, to a
* protocol initiation decoder. It is a cumulative decoder, which means that it can accumulate data to decode in the
@@ -66,6 +66,8 @@ public class AMQDecoder
private AMQMethodBodyFactory _bodyFactory;
+ private boolean _firstRead = true;
+
private List<ByteArrayInputStream> _remainingBufs = new ArrayList<ByteArrayInputStream>();
/**
@@ -94,6 +96,11 @@ public class AMQDecoder
_expectProtocolInitiation = expectProtocolInitiation;
}
+ public void setMaxFrameSize(final long frameMax)
+ {
+ _dataBlockDecoder.setMaxFrameSize(frameMax);
+ }
+
private class RemainingByteArrayInputStream extends InputStream
{
private int _currentListPos;
@@ -234,6 +241,17 @@ public class AMQDecoder
msg = new ByteArrayDataInput(buf.array(),buf.arrayOffset()+buf.position(), buf.remaining());
}
+ // If this is the first read then we may be getting a protocol initiation back if we tried to negotiate
+ // an unsupported version
+ if(_firstRead && buf.hasRemaining())
+ {
+ _firstRead = false;
+ if(!_expectProtocolInitiation && buf.get(buf.position()) > 8)
+ {
+ _expectProtocolInitiation = true;
+ }
+ }
+
boolean enoughData = true;
while (enoughData)
{
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
index 9d5e654ad0..d00ddf4074 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
@@ -20,12 +20,13 @@
*/
package org.apache.qpid.framing;
+import java.io.IOException;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.codec.MarkableDataInput;
-
-import java.io.IOException;
+import org.apache.qpid.protocol.AMQConstant;
public class AMQDataBlockDecoder
{
@@ -40,6 +41,7 @@ public class AMQDataBlockDecoder
}
private Logger _logger = LoggerFactory.getLogger(AMQDataBlockDecoder.class);
+ private long _maxFrameSize = AMQConstant.FRAME_MIN_SIZE.getCode();
public AMQDataBlockDecoder()
{ }
@@ -59,14 +61,17 @@ public class AMQDataBlockDecoder
// Get an unsigned int, lifted from MINA ByteBuffer getUnsignedInt()
final long bodySize = in.readInt() & 0xffffffffL;
-
+ if(bodySize > _maxFrameSize)
+ {
+ throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR, "Incoming frame size of "+bodySize+" is larger than negotiated maximum of " + _maxFrameSize);
+ }
in.reset();
return (remainingAfterAttributes >= bodySize);
}
- public AMQFrame createAndPopulateFrame(AMQMethodBodyFactory methodBodyFactory, MarkableDataInput in)
+ public AMQFrame createAndPopulateFrame(BodyFactory methodBodyFactory, MarkableDataInput in)
throws AMQFrameDecodingException, AMQProtocolVersionException, IOException
{
final byte type = in.readByte();
@@ -83,7 +88,7 @@ public class AMQDataBlockDecoder
if (bodyFactory == null)
{
- throw new AMQFrameDecodingException(null, "Unsupported frame type: " + type, null);
+ throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR, "Unsupported frame type: " + type);
}
final int channel = in.readUnsignedShort();
@@ -92,8 +97,8 @@ public class AMQDataBlockDecoder
// bodySize can be zero
if ((channel < 0) || (bodySize < 0))
{
- throw new AMQFrameDecodingException(null, "Undecodable frame: type = " + type + " channel = " + channel
- + " bodySize = " + bodySize, null);
+ throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR, "Undecodable frame: type = " + type + " channel = " + channel
+ + " bodySize = " + bodySize);
}
AMQFrame frame = new AMQFrame(in, channel, bodySize, bodyFactory);
@@ -101,11 +106,15 @@ public class AMQDataBlockDecoder
byte marker = in.readByte();
if ((marker & 0xFF) != 0xCE)
{
- throw new AMQFrameDecodingException(null, "End of frame marker not found. Read " + marker + " length=" + bodySize
- + " type=" + type, null);
+ throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR, "End of frame marker not found. Read " + marker + " length=" + bodySize
+ + " type=" + type);
}
return frame;
}
+ public void setMaxFrameSize(final long maxFrameSize)
+ {
+ _maxFrameSize = maxFrameSize;
+ }
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolHeaderException.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolHeaderException.java
index b0c92d9aab..b55a48067d 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolHeaderException.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolHeaderException.java
@@ -20,17 +20,12 @@
*/
package org.apache.qpid.framing;
-import org.apache.qpid.AMQException;
+import org.apache.qpid.transport.TransportException;
-/**
- * AMQProtocolHeaderException indicates a format error in an AMQP frame header.
- * <p>
- * TODO Not an AMQP exception as no status code.
- */
-public class AMQProtocolHeaderException extends AMQException
+public class AMQProtocolHeaderException extends TransportException
{
public AMQProtocolHeaderException(String message, Throwable cause)
{
- super(null, message, cause);
+ super(message, cause);
}
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
index d48cd1754c..1866e1fd15 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
@@ -20,20 +20,21 @@
*/
package org.apache.qpid.transport;
-import org.apache.qpid.common.QpidProperties;
-import org.apache.qpid.configuration.ClientProperties;
-import org.apache.qpid.properties.ConnectionStartProperties;
-import org.apache.qpid.transport.util.Logger;
-
import static org.apache.qpid.transport.Connection.State.OPEN;
import static org.apache.qpid.transport.Connection.State.RESUMING;
-import javax.security.sasl.SaslClient;
-import javax.security.sasl.SaslException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+
+import org.apache.qpid.common.QpidProperties;
+import org.apache.qpid.configuration.ClientProperties;
+import org.apache.qpid.properties.ConnectionStartProperties;
+import org.apache.qpid.transport.util.Logger;
+
/**
* ClientDelegate
@@ -138,13 +139,24 @@ public class ClientDelegate extends ConnectionDelegate
int actualHeartbeatInterval = calculateHeartbeatInterval(heartbeatInterval,
tune.getHeartbeatMin(),
tune.getHeartbeatMax());
+ int maxFrameSize = tune.getMaxFrameSize();
+ int settingsMaxFrameSize = conn.getConnectionSettings().getMaxFrameSize();
+ if(maxFrameSize == 0 && settingsMaxFrameSize != 0 && settingsMaxFrameSize < 0xffff)
+ {
+ maxFrameSize = Math.max(Constant.MIN_MAX_FRAME_SIZE, settingsMaxFrameSize);
+ }
+ else if(maxFrameSize != 0 && settingsMaxFrameSize != 0)
+ {
+ maxFrameSize = Math.max(Constant.MIN_MAX_FRAME_SIZE, Math.min(maxFrameSize, settingsMaxFrameSize));
+ }
conn.connectionTuneOk(tune.getChannelMax(),
- tune.getMaxFrameSize(),
+ maxFrameSize,
actualHeartbeatInterval);
int idleTimeout = (int)(actualHeartbeatInterval * 1000 * heartbeatTimeoutFactor);
conn.getNetworkConnection().setMaxReadIdle((int)(actualHeartbeatInterval*heartbeatTimeoutFactor));
conn.getNetworkConnection().setMaxWriteIdle(actualHeartbeatInterval);
+ conn.setMaxFrameSize(maxFrameSize == 0 ? 0xffff : maxFrameSize);
conn.setIdleTimeout(idleTimeout);
int channelMax = tune.getChannelMax();
@@ -183,7 +195,7 @@ public class ClientDelegate extends ConnectionDelegate
/**
* Currently the spec specified the min and max for heartbeat using secs
*/
- private int calculateHeartbeatInterval(int heartbeat,int min, int max)
+ int calculateHeartbeatInterval(int heartbeat,int min, int max)
{
int i = heartbeat;
if (i == 0)
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
index 7c604e8e8e..44cb30e735 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
@@ -20,23 +20,12 @@
*/
package org.apache.qpid.transport;
-import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.transport.network.*;
-import org.apache.qpid.transport.network.security.SecurityLayer;
-import org.apache.qpid.transport.network.security.SecurityLayerFactory;
-import org.apache.qpid.transport.util.Logger;
-import org.apache.qpid.transport.util.Waiter;
-import org.apache.qpid.util.Strings;
-
import static org.apache.qpid.transport.Connection.State.CLOSED;
import static org.apache.qpid.transport.Connection.State.CLOSING;
import static org.apache.qpid.transport.Connection.State.NEW;
import static org.apache.qpid.transport.Connection.State.OPEN;
import static org.apache.qpid.transport.Connection.State.OPENING;
-import javax.security.sasl.SaslClient;
-import javax.security.sasl.SaslServer;
-
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -48,6 +37,23 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslServer;
+
+import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.transport.network.Assembler;
+import org.apache.qpid.transport.network.Disassembler;
+import org.apache.qpid.transport.network.InputHandler;
+import org.apache.qpid.transport.network.NetworkConnection;
+import org.apache.qpid.transport.network.OutgoingNetworkTransport;
+import org.apache.qpid.transport.network.Transport;
+import org.apache.qpid.transport.network.TransportActivity;
+import org.apache.qpid.transport.network.security.SecurityLayer;
+import org.apache.qpid.transport.network.security.SecurityLayerFactory;
+import org.apache.qpid.transport.util.Logger;
+import org.apache.qpid.transport.util.Waiter;
+import org.apache.qpid.util.Strings;
+
/**
* Connection
@@ -71,7 +77,7 @@ public class Connection extends ConnectionInvoker
private long _lastSendTime;
private long _lastReadTime;
private NetworkConnection _networkConnection;
-
+ private FrameSizeObserver _frameSizeObserver;
public enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD, RESUMING }
@@ -224,7 +230,9 @@ public class Connection extends ConnectionInvoker
securityLayer = SecurityLayerFactory.newInstance(getConnectionSettings());
OutgoingNetworkTransport transport = Transport.getOutgoingTransportInstance(ProtocolVersion.v0_10);
- Receiver<ByteBuffer> secureReceiver = securityLayer.receiver(new InputHandler(new Assembler(this)));
+ final InputHandler inputHandler = new InputHandler(new Assembler(this));
+ addFrameSizeObserver(inputHandler);
+ Receiver<ByteBuffer> secureReceiver = securityLayer.receiver(inputHandler);
if(secureReceiver instanceof ConnectionListener)
{
addConnectionListener((ConnectionListener)secureReceiver);
@@ -241,7 +249,9 @@ public class Connection extends ConnectionInvoker
{
addConnectionListener((ConnectionListener)secureSender);
}
- sender = new Disassembler(secureSender, settings.getMaxFrameSize());
+ Disassembler disassembler = new Disassembler(secureSender, Constant.MIN_MAX_FRAME_SIZE);
+ sender = disassembler;
+ addFrameSizeObserver(disassembler);
send(new ProtocolHeader(1, 0, 10));
@@ -809,4 +819,33 @@ public class Connection extends ConnectionInvoker
{
return _networkConnection;
}
+
+ public void setMaxFrameSize(final int maxFrameSize)
+ {
+ if(_frameSizeObserver != null)
+ {
+ _frameSizeObserver.setMaxFrameSize(maxFrameSize);
+ }
+ }
+
+ public void addFrameSizeObserver(final FrameSizeObserver frameSizeObserver)
+ {
+ if(_frameSizeObserver == null)
+ {
+ _frameSizeObserver = frameSizeObserver;
+ }
+ else
+ {
+ final FrameSizeObserver currentObserver = _frameSizeObserver;
+ _frameSizeObserver = new FrameSizeObserver()
+ {
+ @Override
+ public void setMaxFrameSize(final int frameSize)
+ {
+ currentObserver.setMaxFrameSize(frameSize);
+ frameSizeObserver.setMaxFrameSize(frameSize);
+ }
+ };
+ }
+ }
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/FrameSizeObserver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/FrameSizeObserver.java
new file mode 100644
index 0000000000..94d0080fbb
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/FrameSizeObserver.java
@@ -0,0 +1,26 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport;
+
+public interface FrameSizeObserver
+{
+ void setMaxFrameSize(int frameSize);
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
index 1e0d5b9698..82a677b8f7 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
@@ -20,18 +20,19 @@
*/
package org.apache.qpid.transport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import static org.apache.qpid.transport.Connection.State.OPEN;
-import javax.security.sasl.Sasl;
-import javax.security.sasl.SaslException;
-import javax.security.sasl.SaslServer;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* ServerDelegate
*/
@@ -136,12 +137,14 @@ public class ServerDelegate extends ConnectionDelegate
protected void tuneAuthorizedConnection(final Connection conn)
{
- conn.connectionTune
- (getChannelMax(),
- org.apache.qpid.transport.network.ConnectionBinding.MAX_FRAME_SIZE,
- 0, getHeartbeatMax());
+ conn.connectionTune(getChannelMax(), getFrameMax(), 0, getHeartbeatMax());
}
-
+
+ protected int getFrameMax()
+ {
+ return org.apache.qpid.transport.network.ConnectionBinding.MAX_FRAME_SIZE;
+ }
+
protected void secure(final Connection conn, final byte[] response)
{
final SaslServer ss = conn.getSaslServer();
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java
index 5a5de597c2..26e8f1850b 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java
@@ -20,17 +20,18 @@
*/
package org.apache.qpid.transport.network;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.transport.Binding;
import org.apache.qpid.transport.Connection;
import org.apache.qpid.transport.ConnectionDelegate;
import org.apache.qpid.transport.ConnectionListener;
+import org.apache.qpid.transport.Constant;
import org.apache.qpid.transport.Receiver;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.network.security.sasl.SASLReceiver;
import org.apache.qpid.transport.network.security.sasl.SASLSender;
-import java.nio.ByteBuffer;
-
/**
* ConnectionBinding
*
@@ -80,23 +81,26 @@ public abstract class ConnectionBinding
}
// XXX: hardcoded max-frame
- Disassembler dis = new Disassembler(sender, MAX_FRAME_SIZE);
+ Disassembler dis = new Disassembler(sender, Constant.MIN_MAX_FRAME_SIZE);
+ conn.addFrameSizeObserver(dis);
conn.setSender(dis);
return conn;
}
public Receiver<ByteBuffer> receiver(Connection conn)
{
- if (conn.getConnectionSettings() != null &&
+ final InputHandler inputHandler = new InputHandler(new Assembler(conn));
+ conn.addFrameSizeObserver(inputHandler);
+ if (conn.getConnectionSettings() != null &&
conn.getConnectionSettings().isUseSASLEncryption())
{
- SASLReceiver receiver = new SASLReceiver(new InputHandler(new Assembler(conn)));
- conn.addConnectionListener((ConnectionListener)receiver);
+ SASLReceiver receiver = new SASLReceiver(inputHandler);
+ conn.addConnectionListener(receiver);
return receiver;
}
else
{
- return new InputHandler(new Assembler(conn));
+ return inputHandler;
}
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
index fe437ecf93..a804cb2f9d 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
@@ -20,6 +20,17 @@
*/
package org.apache.qpid.transport.network;
+import static java.lang.Math.min;
+import static org.apache.qpid.transport.network.Frame.FIRST_FRAME;
+import static org.apache.qpid.transport.network.Frame.FIRST_SEG;
+import static org.apache.qpid.transport.network.Frame.HEADER_SIZE;
+import static org.apache.qpid.transport.network.Frame.LAST_FRAME;
+import static org.apache.qpid.transport.network.Frame.LAST_SEG;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import org.apache.qpid.transport.FrameSizeObserver;
import org.apache.qpid.transport.Header;
import org.apache.qpid.transport.Method;
import org.apache.qpid.transport.ProtocolDelegate;
@@ -31,24 +42,13 @@ import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.Struct;
import org.apache.qpid.transport.codec.BBEncoder;
-import static org.apache.qpid.transport.network.Frame.FIRST_FRAME;
-import static org.apache.qpid.transport.network.Frame.FIRST_SEG;
-import static org.apache.qpid.transport.network.Frame.HEADER_SIZE;
-import static org.apache.qpid.transport.network.Frame.LAST_FRAME;
-import static org.apache.qpid.transport.network.Frame.LAST_SEG;
-
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-
-import static java.lang.Math.min;
-
/**
* Disassembler
*/
-public final class Disassembler implements Sender<ProtocolEvent>, ProtocolDelegate<Void>
+public final class Disassembler implements Sender<ProtocolEvent>, ProtocolDelegate<Void>, FrameSizeObserver
{
private final Sender<ByteBuffer> sender;
- private final int maxPayload;
+ private int maxPayload;
private final Object sendlock = new Object();
private final static ThreadLocal<BBEncoder> _encoder = new ThreadLocal<BBEncoder>()
{
@@ -60,11 +60,11 @@ public final class Disassembler implements Sender<ProtocolEvent>, ProtocolDelega
public Disassembler(Sender<ByteBuffer> sender, int maxFrame)
{
+ this.sender = sender;
if (maxFrame <= HEADER_SIZE || maxFrame >= 64*1024)
{
throw new IllegalArgumentException("maxFrame must be > HEADER_SIZE and < 64K: " + maxFrame);
}
- this.sender = sender;
this.maxPayload = maxFrame - HEADER_SIZE;
}
@@ -255,4 +255,15 @@ public final class Disassembler implements Sender<ProtocolEvent>, ProtocolDelega
{
sender.setIdleTimeout(i);
}
+
+ @Override
+ public void setMaxFrameSize(final int maxFrame)
+ {
+ if (maxFrame <= HEADER_SIZE || maxFrame >= 64*1024)
+ {
+ throw new IllegalArgumentException("maxFrame must be > HEADER_SIZE and < 64K: " + maxFrame);
+ }
+ this.maxPayload = maxFrame - HEADER_SIZE;
+
+ }
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Frame.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Frame.java
index 9416c4c9fa..e810d9e8ae 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Frame.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Frame.java
@@ -20,12 +20,12 @@
*/
package org.apache.qpid.transport.network;
-import org.apache.qpid.transport.SegmentType;
-
import static org.apache.qpid.transport.util.Functions.str;
import java.nio.ByteBuffer;
+import org.apache.qpid.transport.SegmentType;
+
/**
* Frame
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java
index 86e05db818..758c2e1eda 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java
@@ -20,11 +20,6 @@
*/
package org.apache.qpid.transport.network;
-import org.apache.qpid.transport.ProtocolError;
-import org.apache.qpid.transport.ProtocolHeader;
-import org.apache.qpid.transport.Receiver;
-import org.apache.qpid.transport.SegmentType;
-
import static org.apache.qpid.transport.network.InputHandler.State.ERROR;
import static org.apache.qpid.transport.network.InputHandler.State.FRAME_BODY;
import static org.apache.qpid.transport.network.InputHandler.State.FRAME_HDR;
@@ -34,6 +29,13 @@ import static org.apache.qpid.transport.util.Functions.str;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
+import org.apache.qpid.transport.Constant;
+import org.apache.qpid.transport.FrameSizeObserver;
+import org.apache.qpid.transport.ProtocolError;
+import org.apache.qpid.transport.ProtocolHeader;
+import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.SegmentType;
+
/**
* InputHandler
@@ -41,15 +43,17 @@ import java.nio.ByteOrder;
* @author Rafael H. Schloming
*/
-public class InputHandler implements Receiver<ByteBuffer>
+public class InputHandler implements Receiver<ByteBuffer>, FrameSizeObserver
{
+ private int _maxFrameSize = Constant.MIN_MAX_FRAME_SIZE;
+
public enum State
{
PROTO_HDR,
FRAME_HDR,
FRAME_BODY,
- ERROR;
+ ERROR
}
private final Receiver<NetworkEvent> receiver;
@@ -83,6 +87,11 @@ public class InputHandler implements Receiver<ByteBuffer>
this(receiver, PROTO_HDR);
}
+ public void setMaxFrameSize(final int maxFrameSize)
+ {
+ _maxFrameSize = maxFrameSize;
+ }
+
private void error(String fmt, Object ... args)
{
receiver.received(new ProtocolError(Frame.L1, fmt, args));
@@ -158,7 +167,8 @@ public class InputHandler implements Receiver<ByteBuffer>
type = SegmentType.get(input.get(pos + 1));
int size = (0xFFFF & input.getShort(pos + 2));
size -= Frame.HEADER_SIZE;
- if (size < 0 || size > (64*1024 - 12))
+ _maxFrameSize = 64 * 1024;
+ if (size < 0 || size > (_maxFrameSize - 12))
{
error("bad frame size: %d", size);
return ERROR;
diff --git a/qpid/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java b/qpid/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java
index cb820b333b..cd810f6b3d 100644
--- a/qpid/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java
+++ b/qpid/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java
@@ -21,6 +21,12 @@
package org.apache.qpid.codec;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+
import junit.framework.TestCase;
import org.apache.qpid.framing.AMQDataBlock;
@@ -29,23 +35,15 @@ import org.apache.qpid.framing.AMQFrameDecodingException;
import org.apache.qpid.framing.AMQProtocolVersionException;
import org.apache.qpid.framing.HeartbeatBody;
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-
public class AMQDecoderTest extends TestCase
{
- private AMQCodecFactory _factory;
private AMQDecoder _decoder;
public void setUp()
{
- _factory = new AMQCodecFactory(false, null);
- _decoder = _factory.getDecoder();
+ _decoder = new AMQDecoder(false, null);
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java
index de2b594211..c771e84f52 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.protocol;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.net.InetSocketAddress;
@@ -32,6 +34,7 @@ import java.util.Set;
import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@@ -52,6 +55,7 @@ public class MultiVersionProtocolEngineFactoryTest extends QpidTestCase
_broker = BrokerTestHelper.createBrokerMock();
when(_broker.getAttribute(Broker.DEFAULT_VIRTUAL_HOST)).thenReturn("default");
when(_broker.getDefaultVirtualHost()).thenReturn("default");
+ when(_broker.getContextValue(eq(Long.class), eq(Broker.BROKER_FRAME_SIZE))).thenReturn(0xffffl);
}
@@ -149,8 +153,10 @@ public class MultiVersionProtocolEngineFactoryTest extends QpidTestCase
{
Set<Protocol> protocols = getAllAMQPProtocols();
+ Port<?> port = mock(Port.class);
+ when(port.getContextValue(eq(Long.class),eq(Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY))).thenReturn(10000l);
MultiVersionProtocolEngineFactory factory =
- new MultiVersionProtocolEngineFactory(_broker, null, false, false, protocols, null, null,
+ new MultiVersionProtocolEngineFactory(_broker, null, false, false, protocols, null, port,
org.apache.qpid.server.model.Transport.TCP);
//create a dummy to retrieve the 'current' ID number
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/transport/MaxFrameSizeTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/transport/MaxFrameSizeTest.java
new file mode 100644
index 0000000000..322b971487
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/transport/MaxFrameSizeTest.java
@@ -0,0 +1,349 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import javax.naming.NamingException;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+
+import org.apache.qpid.codec.MarkableDataInput;
+import org.apache.qpid.framing.AMQBody;
+import org.apache.qpid.framing.AMQDataBlockDecoder;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQFrameDecodingException;
+import org.apache.qpid.framing.AMQProtocolVersionException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BodyFactory;
+import org.apache.qpid.framing.ByteArrayDataInput;
+import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.ConnectionStartOkBody;
+import org.apache.qpid.framing.ConnectionTuneOkBody;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.amqp_0_91.ConnectionStartOkBodyImpl;
+import org.apache.qpid.framing.amqp_0_91.ConnectionTuneOkBodyImpl;
+import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91;
+import org.apache.qpid.jms.BrokerDetails;
+import org.apache.qpid.server.model.AuthenticationProvider;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.Protocol;
+import org.apache.qpid.server.protocol.v0_8.ProtocolEngineCreator_0_8;
+import org.apache.qpid.server.protocol.v0_8.ProtocolEngineCreator_0_9;
+import org.apache.qpid.server.protocol.v0_8.ProtocolEngineCreator_0_9_1;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.test.utils.TestBrokerConfiguration;
+
+public class MaxFrameSizeTest extends QpidBrokerTestCase
+{
+
+ @Override
+ public void setUp() throws Exception
+ {
+ // don't call super.setup() as we want a change to set stuff up before the broker starts
+ // super.setUp();
+ }
+
+ public void testTooSmallFrameSize() throws Exception
+ {
+
+ getBrokerConfiguration().setObjectAttribute(AuthenticationProvider.class,
+ TestBrokerConfiguration.ENTRY_NAME_AUTHENTICATION_PROVIDER,
+ "secureOnlyMechanisms",
+ "[]");
+ super.setUp();
+
+ if(isBroker010())
+ {
+ Connection conn = new Connection();
+ final ConnectionSettings settings = new ConnectionSettings();
+ BrokerDetails brokerDetails = getConnectionFactory().getConnectionURL().getAllBrokerDetails().get(0);
+ settings.setHost(brokerDetails.getHost());
+ settings.setPort(brokerDetails.getPort());
+ settings.setUsername(GUEST_USERNAME);
+ settings.setPassword(GUEST_PASSWORD);
+ final ConnectionDelegate clientDelegate = new TestClientDelegate(settings, 1024);
+ conn.setConnectionDelegate(clientDelegate);
+ try
+ {
+ conn.connect(settings);
+ fail("Connection should not be possible with a frame size < " + Constant.MIN_MAX_FRAME_SIZE);
+ }
+ catch(ConnectionException e)
+ {
+ // pass
+ }
+
+ }
+ else
+ {
+ doAMQP08test(1024, new ResultEvaluator()
+ {
+
+ @Override
+ public void evaluate(final Socket socket, final List<AMQFrame> frames)
+ {
+ if(!socket.isClosed())
+ {
+ AMQFrame lastFrame = frames.get(frames.size() - 1);
+ assertTrue("Connection should not be possible with a frame size < " + Constant.MIN_MAX_FRAME_SIZE, lastFrame.getBodyFrame() instanceof ConnectionCloseBody);
+ }
+ }
+ });
+ }
+ }
+
+
+ public void testTooLargeFrameSize() throws Exception
+ {
+ getBrokerConfiguration().setObjectAttribute(AuthenticationProvider.class,
+ TestBrokerConfiguration.ENTRY_NAME_AUTHENTICATION_PROVIDER,
+ "secureOnlyMechanisms",
+ "[]");
+ setTestSystemProperty(Broker.BROKER_FRAME_SIZE, "8192");
+ super.setUp();
+ if(isBroker010())
+ {
+ Connection conn = new Connection();
+ final ConnectionSettings settings = new ConnectionSettings();
+ BrokerDetails brokerDetails = getConnectionFactory().getConnectionURL().getAllBrokerDetails().get(0);
+ settings.setHost(brokerDetails.getHost());
+ settings.setPort(brokerDetails.getPort());
+ settings.setUsername(GUEST_USERNAME);
+ settings.setPassword(GUEST_PASSWORD);
+ final ConnectionDelegate clientDelegate = new TestClientDelegate(settings, 0xffff);
+ conn.setConnectionDelegate(clientDelegate);
+ try
+ {
+ conn.connect(settings);
+ fail("Connection should not be possible with a frame size larger than the broker requested");
+ }
+ catch(ConnectionException e)
+ {
+ // pass
+ }
+
+ }
+ else
+ {
+ doAMQP08test(10000, new ResultEvaluator()
+ {
+
+ @Override
+ public void evaluate(final Socket socket, final List<AMQFrame> frames)
+ {
+ if(!socket.isClosed())
+ {
+ AMQFrame lastFrame = frames.get(frames.size() - 1);
+ assertTrue("Connection should not be possible with a frame size larger than the broker requested", lastFrame.getBodyFrame() instanceof ConnectionCloseBody);
+ }
+ }
+ });
+ }
+ }
+
+ private static interface ResultEvaluator
+ {
+ void evaluate(Socket socket, List<AMQFrame> frames);
+ }
+
+ private void doAMQP08test(int frameSize, ResultEvaluator evaluator)
+ throws NamingException, IOException, AMQFrameDecodingException, AMQProtocolVersionException
+ {
+ BrokerDetails brokerDetails = getConnectionFactory().getConnectionURL().getAllBrokerDetails().get(0);
+
+ Socket socket = new Socket(brokerDetails.getHost(), brokerDetails.getPort());
+ socket.setTcpNoDelay(true);
+ OutputStream os = socket.getOutputStream();
+
+ byte[] protocolHeader;
+ Protocol protocol = getBrokerProtocol();
+ switch(protocol)
+ {
+ case AMQP_0_8:
+ protocolHeader = (ProtocolEngineCreator_0_8.getInstance().getHeaderIdentifier());
+ break;
+ case AMQP_0_9:
+ protocolHeader = (ProtocolEngineCreator_0_9.getInstance().getHeaderIdentifier());
+ break;
+ case AMQP_0_9_1:
+ protocolHeader = (ProtocolEngineCreator_0_9_1.getInstance().getHeaderIdentifier());
+ break;
+ default:
+ throw new RuntimeException("Unexpected Protocol Version: " + protocol);
+ }
+ os.write(protocolHeader);
+ InputStream is = socket.getInputStream();
+
+ final byte[] response = new byte[2+GUEST_USERNAME.length()+GUEST_PASSWORD.length()];
+ int i = 1;
+ for(byte b : GUEST_USERNAME.getBytes(StandardCharsets.US_ASCII))
+ {
+ response[i++] = b;
+ }
+ i++;
+ for(byte b : GUEST_PASSWORD.getBytes(StandardCharsets.US_ASCII))
+ {
+ response[i++] = b;
+ }
+
+ ConnectionStartOkBody startOK = new ConnectionStartOkBodyImpl(new FieldTable(), AMQShortString.valueOf("PLAIN"), response, AMQShortString.valueOf("en_US"));
+
+ DataOutputStream dos = new DataOutputStream(os);
+ new AMQFrame(0, startOK).writePayload(dos);
+ dos.flush();
+ ConnectionTuneOkBody tuneOk = new ConnectionTuneOkBodyImpl(256, frameSize, 0);
+ new AMQFrame(0, tuneOk).writePayload(dos);
+ dos.flush();
+ socket.setSoTimeout(5000);
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ byte[] buffer = new byte[1024];
+ int size;
+ while((size = is.read(buffer)) > 0)
+ {
+ baos.write(buffer,0,size);
+ }
+
+ byte[] serverData = baos.toByteArray();
+ ByteArrayDataInput badi = new ByteArrayDataInput(serverData);
+ AMQDataBlockDecoder datablockDecoder = new AMQDataBlockDecoder();
+ final MethodRegistry_0_91 methodRegistry_0_91 = new MethodRegistry_0_91();
+ BodyFactory methodBodyFactory = new BodyFactory()
+ {
+ @Override
+ public AMQBody createBody(final MarkableDataInput in, final long bodySize)
+ throws AMQFrameDecodingException, IOException
+ {
+ return methodRegistry_0_91.convertToBody(in, bodySize);
+ }
+ };
+
+ List<AMQFrame> frames = new ArrayList<>();
+ while (datablockDecoder.decodable(badi))
+ {
+ frames.add(datablockDecoder.createAndPopulateFrame(methodBodyFactory, badi));
+ }
+
+ evaluator.evaluate(socket, frames);
+ }
+
+ private static class TestClientDelegate extends ClientDelegate
+ {
+
+ private final int _maxFrameSize;
+
+ public TestClientDelegate(final ConnectionSettings settings, final int maxFrameSize)
+ {
+ super(settings);
+ _maxFrameSize = maxFrameSize;
+ }
+
+ @Override
+ protected SaslClient createSaslClient(final List<Object> brokerMechs) throws ConnectionException, SaslException
+ {
+ final CallbackHandler handler = new CallbackHandler()
+ {
+ @Override
+ public void handle(final Callback[] callbacks) throws IOException, UnsupportedCallbackException
+ {
+ for (int i = 0; i < callbacks.length; i++)
+ {
+ Callback cb = callbacks[i];
+ if (cb instanceof NameCallback)
+ {
+ ((NameCallback)cb).setName(GUEST_USERNAME);
+ }
+ else if (cb instanceof PasswordCallback)
+ {
+ ((PasswordCallback)cb).setPassword(GUEST_PASSWORD.toCharArray());
+ }
+ else
+ {
+ throw new UnsupportedCallbackException(cb);
+ }
+ }
+
+ }
+ };
+ String[] selectedMechs = {};
+ for(String mech : new String[] { "ANONYMOUS", "PLAIN", "CRAM-MD5", "SCRAM-SHA-1", "SCRAM-SHA-256"})
+ {
+ if(brokerMechs.contains(mech))
+ {
+ selectedMechs = new String[] {mech};
+ break;
+ }
+ }
+
+
+ return Sasl.createSaslClient(selectedMechs,
+ null,
+ getConnectionSettings().getSaslProtocol(),
+ getConnectionSettings().getSaslServerName(),
+ Collections.<String,Object>emptyMap(),
+ handler);
+
+ }
+
+ @Override
+ public void connectionTune(Connection conn, ConnectionTune tune)
+ {
+ int heartbeatInterval = getConnectionSettings().getHeartbeatInterval010();
+ float heartbeatTimeoutFactor = getConnectionSettings().getHeartbeatTimeoutFactor();
+ int actualHeartbeatInterval = calculateHeartbeatInterval(heartbeatInterval,
+ tune.getHeartbeatMin(),
+ tune.getHeartbeatMax());
+
+ conn.connectionTuneOk(tune.getChannelMax(),
+ _maxFrameSize,
+ actualHeartbeatInterval);
+
+ int idleTimeout = (int)(actualHeartbeatInterval * 1000 * heartbeatTimeoutFactor);
+ conn.getNetworkConnection().setMaxReadIdle((int)(actualHeartbeatInterval*heartbeatTimeoutFactor));
+ conn.getNetworkConnection().setMaxWriteIdle(actualHeartbeatInterval);
+ conn.setMaxFrameSize(_maxFrameSize);
+
+
+ conn.setIdleTimeout(idleTimeout);
+
+ int channelMax = tune.getChannelMax();
+ conn.setChannelMax(channelMax == 0 ? Connection.MAX_CHANNEL_MAX : channelMax);
+
+ conn.connectionOpen(getConnectionSettings().getVhost(), null, Option.INSIST);
+ }
+
+ }
+}