diff options
Diffstat (limited to 'qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java')
-rw-r--r-- | qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java | 60 |
1 files changed, 41 insertions, 19 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java index 7751ff765d..bab2d802e8 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java @@ -50,21 +50,7 @@ import org.apache.qpid.server.security.SubjectCreator; import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus; import org.apache.qpid.server.security.auth.SubjectAuthenticationResult; import org.apache.qpid.server.virtualhost.VirtualHostImpl; -import org.apache.qpid.transport.Binary; -import org.apache.qpid.transport.Connection; -import org.apache.qpid.transport.ConnectionClose; -import org.apache.qpid.transport.ConnectionCloseCode; -import org.apache.qpid.transport.ConnectionOpen; -import org.apache.qpid.transport.ConnectionOpenOk; -import org.apache.qpid.transport.ConnectionStartOk; -import org.apache.qpid.transport.ConnectionTuneOk; -import org.apache.qpid.transport.ServerDelegate; -import org.apache.qpid.transport.Session; -import org.apache.qpid.transport.SessionAttach; -import org.apache.qpid.transport.SessionDelegate; -import org.apache.qpid.transport.SessionDetach; -import org.apache.qpid.transport.SessionDetachCode; -import org.apache.qpid.transport.SessionDetached; +import org.apache.qpid.transport.*; import org.apache.qpid.transport.network.NetworkConnection; public class ServerConnectionDelegate extends ServerDelegate @@ -76,15 +62,16 @@ public class ServerConnectionDelegate extends ServerDelegate private int _maxNoOfChannels; private Map<String,Object> _clientProperties; private final SubjectCreator _subjectCreator; + private int _maximumFrameSize; - public ServerConnectionDelegate(Broker broker, String localFQDN, SubjectCreator subjectCreator) + public ServerConnectionDelegate(Broker<?> broker, String localFQDN, SubjectCreator subjectCreator) { this(createConnectionProperties(broker), Collections.singletonList((Object)"en_US"), broker, localFQDN, subjectCreator); } private ServerConnectionDelegate(Map<String, Object> properties, List<Object> locales, - Broker broker, + Broker<?> broker, String localFQDN, SubjectCreator subjectCreator) { @@ -94,9 +81,10 @@ public class ServerConnectionDelegate extends ServerDelegate _localFQDN = localFQDN; _maxNoOfChannels = broker.getConnection_sessionCountLimit(); _subjectCreator = subjectCreator; + _maximumFrameSize = (int) Math.min(0xffffl, broker.getContextValue(Long.class, Broker.BROKER_FRAME_SIZE)); } - private static List<String> getFeatures(Broker broker) + private static List<String> getFeatures(Broker<?> broker) { String brokerDisabledFeatures = System.getProperty(BrokerProperties.PROPERTY_DISABLED_FEATURES); final List<String> features = new ArrayList<String>(); @@ -108,7 +96,7 @@ public class ServerConnectionDelegate extends ServerDelegate return Collections.unmodifiableList(features); } - private static Map<String, Object> createConnectionProperties(final Broker broker) + private static Map<String, Object> createConnectionProperties(final Broker<?> broker) { final Map<String,Object> map = new HashMap<String,Object>(); // Federation tag is used by the client to identify the broker instance @@ -234,6 +222,7 @@ public class ServerConnectionDelegate extends ServerDelegate { ServerConnection sconn = (ServerConnection) conn; int okChannelMax = ok.getChannelMax(); + int okMaxFrameSize = ok.getMaxFrameSize(); if (okChannelMax > getChannelMax()) { @@ -246,6 +235,31 @@ public class ServerConnectionDelegate extends ServerDelegate return; } + if(okMaxFrameSize > getFrameMax()) + { + LOGGER.error("Connection '" + sconn.getConnectionId() + "' being severed, " + + "client connectionTuneOk returned a frameMax (" + okMaxFrameSize + + ") above the server's offered limit (" + getFrameMax() +")"); + + //Due to the error we must forcefully close the connection without negotiation + sconn.getSender().close(); + return; + } + else if(okMaxFrameSize > 0 && okMaxFrameSize < Constant.MIN_MAX_FRAME_SIZE) + { + LOGGER.error("Connection '" + sconn.getConnectionId() + "' being severed, " + + "client connectionTuneOk returned a frameMax (" + okMaxFrameSize + + ") below the minimum permitted size (" + Constant.MIN_MAX_FRAME_SIZE +")"); + + //Due to the error we must forcefully close the connection without negotiation + sconn.getSender().close(); + return; + } + else if(okMaxFrameSize == 0) + { + okMaxFrameSize = getFrameMax(); + } + final NetworkConnection networkConnection = sconn.getNetworkConnection(); if(ok.hasHeartbeat()) { @@ -266,6 +280,8 @@ public class ServerConnectionDelegate extends ServerDelegate } setConnectionTuneOkChannelMax(sconn, okChannelMax); + + conn.setMaxFrameSize(okMaxFrameSize); } @Override @@ -279,6 +295,12 @@ public class ServerConnectionDelegate extends ServerDelegate _maxNoOfChannels = channelMax; } + @Override + protected int getFrameMax() + { + return _maximumFrameSize; + } + @Override public void sessionDetach(Connection conn, SessionDetach dtc) { // To ensure a clean detach, we stop any remaining subscriptions. Stop ensures |