summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java76
1 files changed, 47 insertions, 29 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
index c13f63b44d..f3153fde62 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
@@ -32,18 +32,19 @@ import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
import org.apache.qpid.common.ServerPropertyNames;
import org.apache.qpid.properties.ConnectionStartProperties;
-import org.apache.qpid.protocol.ProtocolEngine;
-import org.apache.qpid.server.configuration.BrokerConfig;
+import org.apache.qpid.server.configuration.BrokerProperties;
+import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.protocol.AMQConnectionModel;
-import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.security.SecurityManager;
-import org.apache.qpid.server.security.auth.AuthenticationResult;
+import org.apache.qpid.server.security.SubjectCreator;
import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus;
-import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
+import org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
import org.apache.qpid.server.subscription.Subscription_0_10;
import org.apache.qpid.server.virtualhost.State;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.*;
+import org.apache.qpid.transport.network.NetworkConnection;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,36 +54,49 @@ public class ServerConnectionDelegate extends ServerDelegate
{
private static final Logger LOGGER = LoggerFactory.getLogger(ServerConnectionDelegate.class);
+ private final Broker _broker;
private final String _localFQDN;
- private final IApplicationRegistry _appRegistry;
private int _maxNoOfChannels;
private Map<String,Object> _clientProperties;
- private final AuthenticationManager _authManager;
+ private final SubjectCreator _subjectCreator;
- public ServerConnectionDelegate(IApplicationRegistry appRegistry, String localFQDN, AuthenticationManager authManager)
+ public ServerConnectionDelegate(Broker broker, String localFQDN, SubjectCreator subjectCreator)
{
- this(createConnectionProperties(appRegistry.getBrokerConfig()), Collections.singletonList((Object)"en_US"), appRegistry, localFQDN, authManager);
+ this(createConnectionProperties(broker), Collections.singletonList((Object)"en_US"), broker, localFQDN, subjectCreator);
}
private ServerConnectionDelegate(Map<String, Object> properties,
List<Object> locales,
- IApplicationRegistry appRegistry,
+ Broker broker,
String localFQDN,
- AuthenticationManager authManager)
+ SubjectCreator subjectCreator)
{
- super(properties, parseToList(authManager.getMechanisms()), locales);
+ super(properties, parseToList(subjectCreator.getMechanisms()), locales);
- _appRegistry = appRegistry;
+ _broker = broker;
_localFQDN = localFQDN;
- _maxNoOfChannels = appRegistry.getConfiguration().getMaxChannelCount();
- _authManager = authManager;
+ _maxNoOfChannels = (Integer)broker.getAttribute(Broker.SESSION_COUNT_LIMIT);
+ _subjectCreator = subjectCreator;
+ }
+
+ private static List<String> getFeatures(Broker broker)
+ {
+ String brokerDisabledFeatures = System.getProperty(BrokerProperties.PROPERTY_DISABLED_FEATURES);
+ final List<String> features = new ArrayList<String>();
+ if (brokerDisabledFeatures == null || !brokerDisabledFeatures.contains(ServerPropertyNames.FEATURE_QPID_JMS_SELECTOR))
+ {
+ features.add(ServerPropertyNames.FEATURE_QPID_JMS_SELECTOR);
+ }
+
+ return Collections.unmodifiableList(features);
}
- private static Map<String, Object> createConnectionProperties(final BrokerConfig brokerConfig)
+ private static Map<String, Object> createConnectionProperties(final Broker broker)
{
final Map<String,Object> map = new HashMap<String,Object>(2);
- map.put(ServerPropertyNames.FEDERATION_TAG, brokerConfig.getFederationTag());
- final List<String> features = brokerConfig.getFeatures();
+ // Federation tag is used by the client to identify the broker instance
+ map.put(ServerPropertyNames.FEDERATION_TAG, broker.getId().toString());
+ final List<String> features = getFeatures(broker);
if (features != null && features.size() > 0)
{
map.put(ServerPropertyNames.QPID_FEATURES, features);
@@ -112,14 +126,14 @@ public class ServerConnectionDelegate extends ServerDelegate
protected SaslServer createSaslServer(Connection conn, String mechanism) throws SaslException
{
- return _authManager.createSaslServer(mechanism, _localFQDN, ((ServerConnection) conn).getPeerPrincipal());
+ return _subjectCreator.createSaslServer(mechanism, _localFQDN, ((ServerConnection) conn).getPeerPrincipal());
}
protected void secure(final SaslServer ss, final Connection conn, final byte[] response)
{
final ServerConnection sconn = (ServerConnection) conn;
- final AuthenticationResult authResult = _authManager.authenticate(ss, response);
+ final SubjectAuthenticationResult authResult = _subjectCreator.authenticate(ss, response);
if (AuthenticationStatus.SUCCESS.equals(authResult.getStatus()))
{
@@ -166,7 +180,7 @@ public class ServerConnectionDelegate extends ServerDelegate
{
vhostName = "";
}
- vhost = _appRegistry.getVirtualHostRegistry().getVirtualHost(vhostName);
+ vhost = _broker.getVirtualHostRegistry().getVirtualHost(vhostName);
SecurityManager.setThreadSubject(sconn.getAuthorizedSubject());
@@ -174,7 +188,7 @@ public class ServerConnectionDelegate extends ServerDelegate
{
sconn.setVirtualHost(vhost);
- if (!vhost.getSecurityManager().accessVirtualhost(vhostName, ((ProtocolEngine) sconn.getConfig()).getRemoteAddress()))
+ if (!vhost.getSecurityManager().accessVirtualhost(vhostName, sconn.getRemoteAddress()))
{
sconn.setState(Connection.State.CLOSING);
sconn.invoke(new ConnectionClose(ConnectionCloseCode.CONNECTION_FORCED, "Permission denied '"+vhostName+"'"));
@@ -215,14 +229,18 @@ public class ServerConnectionDelegate extends ServerDelegate
return;
}
- setConnectionTuneOkChannelMax(sconn, okChannelMax);
- }
+ if(ok.hasHeartbeat())
+ {
+ final int heartbeat = ok.getHeartbeat();
+ if(heartbeat > 0)
+ {
+ final NetworkConnection networkConnection = sconn.getNetworkConnection();
+ networkConnection.setMaxReadIdle(2 * heartbeat);
+ networkConnection.setMaxWriteIdle(heartbeat);
+ }
+ }
- @Override
- protected int getHeartbeatMax()
- {
- //TODO: implement broker support for actually sending heartbeats
- return 0;
+ setConnectionTuneOkChannelMax(sconn, okChannelMax);
}
@Override