diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java | 76 |
1 files changed, 47 insertions, 29 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java index c13f63b44d..f3153fde62 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java +++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java @@ -32,18 +32,19 @@ import javax.security.sasl.SaslException; import javax.security.sasl.SaslServer; import org.apache.qpid.common.ServerPropertyNames; import org.apache.qpid.properties.ConnectionStartProperties; -import org.apache.qpid.protocol.ProtocolEngine; -import org.apache.qpid.server.configuration.BrokerConfig; +import org.apache.qpid.server.configuration.BrokerProperties; +import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.protocol.AMQConnectionModel; -import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.security.SecurityManager; -import org.apache.qpid.server.security.auth.AuthenticationResult; +import org.apache.qpid.server.security.SubjectCreator; import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus; -import org.apache.qpid.server.security.auth.manager.AuthenticationManager; +import org.apache.qpid.server.security.auth.SubjectAuthenticationResult; import org.apache.qpid.server.subscription.Subscription_0_10; import org.apache.qpid.server.virtualhost.State; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.*; +import org.apache.qpid.transport.network.NetworkConnection; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,36 +54,49 @@ public class ServerConnectionDelegate extends ServerDelegate { private static final Logger LOGGER = LoggerFactory.getLogger(ServerConnectionDelegate.class); + private final Broker _broker; private final String _localFQDN; - private final IApplicationRegistry _appRegistry; private int _maxNoOfChannels; private Map<String,Object> _clientProperties; - private final AuthenticationManager _authManager; + private final SubjectCreator _subjectCreator; - public ServerConnectionDelegate(IApplicationRegistry appRegistry, String localFQDN, AuthenticationManager authManager) + public ServerConnectionDelegate(Broker broker, String localFQDN, SubjectCreator subjectCreator) { - this(createConnectionProperties(appRegistry.getBrokerConfig()), Collections.singletonList((Object)"en_US"), appRegistry, localFQDN, authManager); + this(createConnectionProperties(broker), Collections.singletonList((Object)"en_US"), broker, localFQDN, subjectCreator); } private ServerConnectionDelegate(Map<String, Object> properties, List<Object> locales, - IApplicationRegistry appRegistry, + Broker broker, String localFQDN, - AuthenticationManager authManager) + SubjectCreator subjectCreator) { - super(properties, parseToList(authManager.getMechanisms()), locales); + super(properties, parseToList(subjectCreator.getMechanisms()), locales); - _appRegistry = appRegistry; + _broker = broker; _localFQDN = localFQDN; - _maxNoOfChannels = appRegistry.getConfiguration().getMaxChannelCount(); - _authManager = authManager; + _maxNoOfChannels = (Integer)broker.getAttribute(Broker.SESSION_COUNT_LIMIT); + _subjectCreator = subjectCreator; + } + + private static List<String> getFeatures(Broker broker) + { + String brokerDisabledFeatures = System.getProperty(BrokerProperties.PROPERTY_DISABLED_FEATURES); + final List<String> features = new ArrayList<String>(); + if (brokerDisabledFeatures == null || !brokerDisabledFeatures.contains(ServerPropertyNames.FEATURE_QPID_JMS_SELECTOR)) + { + features.add(ServerPropertyNames.FEATURE_QPID_JMS_SELECTOR); + } + + return Collections.unmodifiableList(features); } - private static Map<String, Object> createConnectionProperties(final BrokerConfig brokerConfig) + private static Map<String, Object> createConnectionProperties(final Broker broker) { final Map<String,Object> map = new HashMap<String,Object>(2); - map.put(ServerPropertyNames.FEDERATION_TAG, brokerConfig.getFederationTag()); - final List<String> features = brokerConfig.getFeatures(); + // Federation tag is used by the client to identify the broker instance + map.put(ServerPropertyNames.FEDERATION_TAG, broker.getId().toString()); + final List<String> features = getFeatures(broker); if (features != null && features.size() > 0) { map.put(ServerPropertyNames.QPID_FEATURES, features); @@ -112,14 +126,14 @@ public class ServerConnectionDelegate extends ServerDelegate protected SaslServer createSaslServer(Connection conn, String mechanism) throws SaslException { - return _authManager.createSaslServer(mechanism, _localFQDN, ((ServerConnection) conn).getPeerPrincipal()); + return _subjectCreator.createSaslServer(mechanism, _localFQDN, ((ServerConnection) conn).getPeerPrincipal()); } protected void secure(final SaslServer ss, final Connection conn, final byte[] response) { final ServerConnection sconn = (ServerConnection) conn; - final AuthenticationResult authResult = _authManager.authenticate(ss, response); + final SubjectAuthenticationResult authResult = _subjectCreator.authenticate(ss, response); if (AuthenticationStatus.SUCCESS.equals(authResult.getStatus())) { @@ -166,7 +180,7 @@ public class ServerConnectionDelegate extends ServerDelegate { vhostName = ""; } - vhost = _appRegistry.getVirtualHostRegistry().getVirtualHost(vhostName); + vhost = _broker.getVirtualHostRegistry().getVirtualHost(vhostName); SecurityManager.setThreadSubject(sconn.getAuthorizedSubject()); @@ -174,7 +188,7 @@ public class ServerConnectionDelegate extends ServerDelegate { sconn.setVirtualHost(vhost); - if (!vhost.getSecurityManager().accessVirtualhost(vhostName, ((ProtocolEngine) sconn.getConfig()).getRemoteAddress())) + if (!vhost.getSecurityManager().accessVirtualhost(vhostName, sconn.getRemoteAddress())) { sconn.setState(Connection.State.CLOSING); sconn.invoke(new ConnectionClose(ConnectionCloseCode.CONNECTION_FORCED, "Permission denied '"+vhostName+"'")); @@ -215,14 +229,18 @@ public class ServerConnectionDelegate extends ServerDelegate return; } - setConnectionTuneOkChannelMax(sconn, okChannelMax); - } + if(ok.hasHeartbeat()) + { + final int heartbeat = ok.getHeartbeat(); + if(heartbeat > 0) + { + final NetworkConnection networkConnection = sconn.getNetworkConnection(); + networkConnection.setMaxReadIdle(2 * heartbeat); + networkConnection.setMaxWriteIdle(heartbeat); + } + } - @Override - protected int getHeartbeatMax() - { - //TODO: implement broker support for actually sending heartbeats - return 0; + setConnectionTuneOkChannelMax(sconn, okChannelMax); } @Override |