summaryrefslogtreecommitdiff
path: root/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java')
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java60
1 files changed, 41 insertions, 19 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
index 7751ff765d..bab2d802e8 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
@@ -50,21 +50,7 @@ import org.apache.qpid.server.security.SubjectCreator;
import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus;
import org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
-import org.apache.qpid.transport.Binary;
-import org.apache.qpid.transport.Connection;
-import org.apache.qpid.transport.ConnectionClose;
-import org.apache.qpid.transport.ConnectionCloseCode;
-import org.apache.qpid.transport.ConnectionOpen;
-import org.apache.qpid.transport.ConnectionOpenOk;
-import org.apache.qpid.transport.ConnectionStartOk;
-import org.apache.qpid.transport.ConnectionTuneOk;
-import org.apache.qpid.transport.ServerDelegate;
-import org.apache.qpid.transport.Session;
-import org.apache.qpid.transport.SessionAttach;
-import org.apache.qpid.transport.SessionDelegate;
-import org.apache.qpid.transport.SessionDetach;
-import org.apache.qpid.transport.SessionDetachCode;
-import org.apache.qpid.transport.SessionDetached;
+import org.apache.qpid.transport.*;
import org.apache.qpid.transport.network.NetworkConnection;
public class ServerConnectionDelegate extends ServerDelegate
@@ -76,15 +62,16 @@ public class ServerConnectionDelegate extends ServerDelegate
private int _maxNoOfChannels;
private Map<String,Object> _clientProperties;
private final SubjectCreator _subjectCreator;
+ private int _maximumFrameSize;
- public ServerConnectionDelegate(Broker broker, String localFQDN, SubjectCreator subjectCreator)
+ public ServerConnectionDelegate(Broker<?> broker, String localFQDN, SubjectCreator subjectCreator)
{
this(createConnectionProperties(broker), Collections.singletonList((Object)"en_US"), broker, localFQDN, subjectCreator);
}
private ServerConnectionDelegate(Map<String, Object> properties,
List<Object> locales,
- Broker broker,
+ Broker<?> broker,
String localFQDN,
SubjectCreator subjectCreator)
{
@@ -94,9 +81,10 @@ public class ServerConnectionDelegate extends ServerDelegate
_localFQDN = localFQDN;
_maxNoOfChannels = broker.getConnection_sessionCountLimit();
_subjectCreator = subjectCreator;
+ _maximumFrameSize = (int) Math.min(0xffffl, broker.getContextValue(Long.class, Broker.BROKER_FRAME_SIZE));
}
- private static List<String> getFeatures(Broker broker)
+ private static List<String> getFeatures(Broker<?> broker)
{
String brokerDisabledFeatures = System.getProperty(BrokerProperties.PROPERTY_DISABLED_FEATURES);
final List<String> features = new ArrayList<String>();
@@ -108,7 +96,7 @@ public class ServerConnectionDelegate extends ServerDelegate
return Collections.unmodifiableList(features);
}
- private static Map<String, Object> createConnectionProperties(final Broker broker)
+ private static Map<String, Object> createConnectionProperties(final Broker<?> broker)
{
final Map<String,Object> map = new HashMap<String,Object>();
// Federation tag is used by the client to identify the broker instance
@@ -234,6 +222,7 @@ public class ServerConnectionDelegate extends ServerDelegate
{
ServerConnection sconn = (ServerConnection) conn;
int okChannelMax = ok.getChannelMax();
+ int okMaxFrameSize = ok.getMaxFrameSize();
if (okChannelMax > getChannelMax())
{
@@ -246,6 +235,31 @@ public class ServerConnectionDelegate extends ServerDelegate
return;
}
+ if(okMaxFrameSize > getFrameMax())
+ {
+ LOGGER.error("Connection '" + sconn.getConnectionId() + "' being severed, " +
+ "client connectionTuneOk returned a frameMax (" + okMaxFrameSize +
+ ") above the server's offered limit (" + getFrameMax() +")");
+
+ //Due to the error we must forcefully close the connection without negotiation
+ sconn.getSender().close();
+ return;
+ }
+ else if(okMaxFrameSize > 0 && okMaxFrameSize < Constant.MIN_MAX_FRAME_SIZE)
+ {
+ LOGGER.error("Connection '" + sconn.getConnectionId() + "' being severed, " +
+ "client connectionTuneOk returned a frameMax (" + okMaxFrameSize +
+ ") below the minimum permitted size (" + Constant.MIN_MAX_FRAME_SIZE +")");
+
+ //Due to the error we must forcefully close the connection without negotiation
+ sconn.getSender().close();
+ return;
+ }
+ else if(okMaxFrameSize == 0)
+ {
+ okMaxFrameSize = getFrameMax();
+ }
+
final NetworkConnection networkConnection = sconn.getNetworkConnection();
if(ok.hasHeartbeat())
{
@@ -266,6 +280,8 @@ public class ServerConnectionDelegate extends ServerDelegate
}
setConnectionTuneOkChannelMax(sconn, okChannelMax);
+
+ conn.setMaxFrameSize(okMaxFrameSize);
}
@Override
@@ -279,6 +295,12 @@ public class ServerConnectionDelegate extends ServerDelegate
_maxNoOfChannels = channelMax;
}
+ @Override
+ protected int getFrameMax()
+ {
+ return _maximumFrameSize;
+ }
+
@Override public void sessionDetach(Connection conn, SessionDetach dtc)
{
// To ensure a clean detach, we stop any remaining subscriptions. Stop ensures