diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java | 807 |
1 files changed, 616 insertions, 191 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java index f1cce2d45c..533ecfe937 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java +++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java @@ -21,82 +21,211 @@ package org.apache.qpid.server.model.adapter; import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.security.AccessControlException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; +import java.util.UUID; + +import javax.net.ssl.KeyManagerFactory; + +import org.apache.log4j.Logger; import org.apache.qpid.common.QpidProperties; +import org.apache.qpid.server.configuration.IllegalConfigurationException; +import org.apache.qpid.server.logging.LogRecorder; +import org.apache.qpid.server.logging.RootMessageLogger; +import org.apache.qpid.server.logging.actors.BrokerActor; +import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.model.AuthenticationProvider; import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.ConfigurationChangeListener; import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.GroupProvider; +import org.apache.qpid.server.model.KeyStore; import org.apache.qpid.server.model.LifetimePolicy; +import org.apache.qpid.server.model.Plugin; import org.apache.qpid.server.model.Port; -import org.apache.qpid.server.model.Protocol; import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.Statistics; -import org.apache.qpid.server.model.Transport; +import org.apache.qpid.server.model.TrustStore; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.model.VirtualHost; -import org.apache.qpid.server.registry.IApplicationRegistry; -import org.apache.qpid.server.security.auth.manager.AuthenticationManager; -import org.apache.qpid.server.security.auth.manager.IAuthenticationManagerRegistry; -import org.apache.qpid.server.transport.QpidAcceptor; +import org.apache.qpid.server.configuration.updater.TaskExecutor; +import org.apache.qpid.server.security.group.FileGroupManager; +import org.apache.qpid.server.security.group.GroupManager; +import org.apache.qpid.server.security.group.GroupPrincipalAccessor; +import org.apache.qpid.server.security.SecurityManager; +import org.apache.qpid.server.security.SubjectCreator; +import org.apache.qpid.server.stats.StatisticsGatherer; +import org.apache.qpid.server.util.MapValueConverter; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; -public class BrokerAdapter extends AbstractAdapter implements Broker, VirtualHostRegistry.RegistryChangeListener, - IApplicationRegistry.PortBindingListener, - IAuthenticationManagerRegistry.RegistryChangeListener +public class BrokerAdapter extends AbstractAdapter implements Broker, ConfigurationChangeListener { - - private final IApplicationRegistry _applicationRegistry; - private String _name; - private final Map<org.apache.qpid.server.virtualhost.VirtualHost, VirtualHostAdapter> _vhostAdapters = - new HashMap<org.apache.qpid.server.virtualhost.VirtualHost, VirtualHostAdapter>(); - private final StatisticsAdapter _statistics; - private final Map<QpidAcceptor, PortAdapter> _portAdapters = new HashMap<QpidAcceptor, PortAdapter>(); - private Collection<HTTPPortAdapter> _httpManagementPorts; - - private final Map<AuthenticationManager, AuthenticationProviderAdapter> _authManagerAdapters = - new HashMap<AuthenticationManager, AuthenticationProviderAdapter>(); - - - public BrokerAdapter(final IApplicationRegistry instance) - { - super(UUIDGenerator.generateRandomUUID()); - _applicationRegistry = instance; - _name = "Broker"; - _statistics = new StatisticsAdapter(instance); - - instance.getVirtualHostRegistry().addRegistryChangeListener(this); - populateVhosts(); - instance.addPortBindingListener(this); - populatePorts(); - instance.addRegistryChangeListener(this); - populateAuthenticationManagers(); - } - - private void populateVhosts() - { - synchronized(_vhostAdapters) - { - Collection<org.apache.qpid.server.virtualhost.VirtualHost> actualVhosts = - _applicationRegistry.getVirtualHostRegistry().getVirtualHosts(); - for(org.apache.qpid.server.virtualhost.VirtualHost vh : actualVhosts) - { - if(!_vhostAdapters.containsKey(vh)) - { - _vhostAdapters.put(vh, new VirtualHostAdapter(this, vh)); - } - } - + private static final Logger LOGGER = Logger.getLogger(BrokerAdapter.class); + + @SuppressWarnings("serial") + public static final Map<String, Class<?>> ATTRIBUTE_TYPES = Collections.unmodifiableMap(new HashMap<String, Class<?>>(){{ + put(ALERT_THRESHOLD_MESSAGE_AGE, Long.class); + put(ALERT_THRESHOLD_MESSAGE_COUNT, Long.class); + put(ALERT_THRESHOLD_QUEUE_DEPTH, Long.class); + put(ALERT_THRESHOLD_MESSAGE_SIZE, Long.class); + put(ALERT_REPEAT_GAP, Long.class); + put(FLOW_CONTROL_SIZE_BYTES, Long.class); + put(FLOW_CONTROL_RESUME_SIZE_BYTES, Long.class); + put(HOUSEKEEPING_CHECK_PERIOD, Long.class); + + put(DEAD_LETTER_QUEUE_ENABLED, Boolean.class); + put(STATISTICS_REPORTING_RESET_ENABLED, Boolean.class); + + put(MAXIMUM_DELIVERY_ATTEMPTS, Integer.class); + put(SESSION_COUNT_LIMIT, Integer.class); + put(HEART_BEAT_DELAY, Integer.class); + put(STATISTICS_REPORTING_PERIOD, Integer.class); + + put(ACL_FILE, String.class); + put(NAME, String.class); + put(DEFAULT_VIRTUAL_HOST, String.class); + put(DEFAULT_AUTHENTICATION_PROVIDER, String.class); + + put(KEY_STORE_PATH, String.class); + put(KEY_STORE_PASSWORD, String.class); + put(KEY_STORE_CERT_ALIAS, String.class); + put(TRUST_STORE_PATH, String.class); + put(TRUST_STORE_PASSWORD, String.class); + put(GROUP_FILE, String.class); + }}); + + public static final int DEFAULT_STATISTICS_REPORTING_PERIOD = 0; + public static final boolean DEFAULT_STATISTICS_REPORTING_RESET_ENABLED = false; + public static final long DEFAULT_ALERT_REPEAT_GAP = 30000l; + public static final long DEFAULT_ALERT_THRESHOLD_MESSAGE_AGE = 0l; + public static final long DEFAULT_ALERT_THRESHOLD_MESSAGE_COUNT = 0l; + public static final long DEFAULT_ALERT_THRESHOLD_MESSAGE_SIZE = 0l; + public static final long DEFAULT_ALERT_THRESHOLD_QUEUE_DEPTH = 0l; + public static final boolean DEFAULT_DEAD_LETTER_QUEUE_ENABLED = false; + public static final int DEFAULT_MAXIMUM_DELIVERY_ATTEMPTS = 0; + public static final long DEFAULT_FLOW_CONTROL_RESUME_SIZE_BYTES = 0l; + public static final long DEFAULT_FLOW_CONTROL_SIZE_BYTES = 0l; + public static final long DEFAULT_HOUSEKEEPING_CHECK_PERIOD = 30000l; + public static final int DEFAULT_HEART_BEAT_DELAY = 0; + public static final int DEFAULT_SESSION_COUNT_LIMIT = 256; + public static final String DEFAULT_NAME = "QpidBroker"; + private static final String DEFAULT_KEY_STORE_NAME = "defaultKeyStore"; + private static final String DEFAULT_TRUST_STORE_NAME = "defaultTrustStore"; + private static final String DEFAULT_GROUP_PROFIDER_NAME = "defaultGroupProvider"; + + private static final String DUMMY_PASSWORD_MASK = "********"; + + @SuppressWarnings("serial") + private static final Map<String, Object> DEFAULTS = Collections.unmodifiableMap(new HashMap<String, Object>(){{ + put(Broker.STATISTICS_REPORTING_PERIOD, DEFAULT_STATISTICS_REPORTING_PERIOD); + put(Broker.STATISTICS_REPORTING_RESET_ENABLED, DEFAULT_STATISTICS_REPORTING_RESET_ENABLED); + put(Broker.ALERT_REPEAT_GAP, DEFAULT_ALERT_REPEAT_GAP); + put(Broker.ALERT_THRESHOLD_MESSAGE_AGE, DEFAULT_ALERT_THRESHOLD_MESSAGE_AGE); + put(Broker.ALERT_THRESHOLD_MESSAGE_COUNT, DEFAULT_ALERT_THRESHOLD_MESSAGE_COUNT); + put(Broker.ALERT_THRESHOLD_MESSAGE_SIZE, DEFAULT_ALERT_THRESHOLD_MESSAGE_SIZE); + put(Broker.ALERT_THRESHOLD_QUEUE_DEPTH, DEFAULT_ALERT_THRESHOLD_QUEUE_DEPTH); + put(Broker.DEAD_LETTER_QUEUE_ENABLED, DEFAULT_DEAD_LETTER_QUEUE_ENABLED); + put(Broker.MAXIMUM_DELIVERY_ATTEMPTS, DEFAULT_MAXIMUM_DELIVERY_ATTEMPTS); + put(Broker.FLOW_CONTROL_RESUME_SIZE_BYTES, DEFAULT_FLOW_CONTROL_RESUME_SIZE_BYTES); + put(Broker.FLOW_CONTROL_SIZE_BYTES, DEFAULT_FLOW_CONTROL_SIZE_BYTES); + put(Broker.HOUSEKEEPING_CHECK_PERIOD, DEFAULT_HOUSEKEEPING_CHECK_PERIOD); + put(Broker.HEART_BEAT_DELAY, DEFAULT_HEART_BEAT_DELAY); + put(Broker.SESSION_COUNT_LIMIT, DEFAULT_SESSION_COUNT_LIMIT); + put(Broker.NAME, DEFAULT_NAME); + }}); + + + + + private final StatisticsGatherer _statisticsGatherer; + private final VirtualHostRegistry _virtualHostRegistry; + private final LogRecorder _logRecorder; + private final RootMessageLogger _rootMessageLogger; + private StatisticsAdapter _statistics; + + private final Map<String, VirtualHost> _vhostAdapters = new HashMap<String, VirtualHost>(); + private final Map<Integer, Port> _portAdapters = new HashMap<Integer, Port>(); + private final Map<String, AuthenticationProvider> _authenticationProviders = new HashMap<String, AuthenticationProvider>(); + private final Map<String, GroupProvider> _groupProviders = new HashMap<String, GroupProvider>(); + private final Map<UUID, ConfiguredObject> _plugins = new HashMap<UUID, ConfiguredObject>(); + private final Map<UUID, KeyStore> _keyStores = new HashMap<UUID, KeyStore>(); + private final Map<UUID, TrustStore> _trustStores = new HashMap<UUID, TrustStore>(); + + private final AuthenticationProviderFactory _authenticationProviderFactory; + private AuthenticationProvider _defaultAuthenticationProvider; + + private final PortFactory _portFactory; + private final SecurityManager _securityManager; + private final UUID _defaultKeyStoreId; + private final UUID _defaultTrustStoreId; + + public BrokerAdapter(UUID id, Map<String, Object> attributes, StatisticsGatherer statisticsGatherer, VirtualHostRegistry virtualHostRegistry, + LogRecorder logRecorder, RootMessageLogger rootMessageLogger, AuthenticationProviderFactory authenticationProviderFactory, + PortFactory portFactory, TaskExecutor taskExecutor) + { + super(id, DEFAULTS, MapValueConverter.convert(attributes, ATTRIBUTE_TYPES), taskExecutor); + _statisticsGatherer = statisticsGatherer; + _virtualHostRegistry = virtualHostRegistry; + _logRecorder = logRecorder; + _rootMessageLogger = rootMessageLogger; + _statistics = new StatisticsAdapter(statisticsGatherer); + _authenticationProviderFactory = authenticationProviderFactory; + _portFactory = portFactory; + _securityManager = new SecurityManager((String)getAttribute(ACL_FILE)); + + _defaultKeyStoreId = UUIDGenerator.generateBrokerChildUUID(KeyStore.class.getSimpleName(), DEFAULT_KEY_STORE_NAME); + _defaultTrustStoreId = UUIDGenerator.generateBrokerChildUUID(TrustStore.class.getSimpleName(), DEFAULT_TRUST_STORE_NAME); + createBrokerChildrenFromAttributes(); + } + + /* + * A temporary method to create broker children that can be only configured via broker attributes + */ + private void createBrokerChildrenFromAttributes() + { + String groupFile = (String) getAttribute(GROUP_FILE); + if (groupFile != null) + { + GroupManager groupManager = new FileGroupManager(groupFile); + UUID groupProviderId = UUIDGenerator.generateBrokerChildUUID(GroupProvider.class.getSimpleName(), + DEFAULT_GROUP_PROFIDER_NAME); + GroupProviderAdapter groupProviderAdapter = new GroupProviderAdapter(groupProviderId, groupManager, this); + addGroupProvider(groupProviderAdapter); + } + Map<String, Object> actualAttributes = getActualAttributes(); + String keyStorePath = (String) getAttribute(KEY_STORE_PATH); + if (keyStorePath != null) + { + Map<String, Object> keyStoreAttributes = new HashMap<String, Object>(); + keyStoreAttributes.put(KeyStore.NAME, DEFAULT_KEY_STORE_NAME); + keyStoreAttributes.put(KeyStore.PATH, keyStorePath); + keyStoreAttributes.put(KeyStore.PASSWORD, (String) actualAttributes.get(KEY_STORE_PASSWORD)); + keyStoreAttributes.put(KeyStore.TYPE, java.security.KeyStore.getDefaultType()); + keyStoreAttributes.put(KeyStore.CERTIFICATE_ALIAS, getAttribute(KEY_STORE_CERT_ALIAS)); + keyStoreAttributes.put(KeyStore.KEY_MANAGER_FACTORY_ALGORITHM, KeyManagerFactory.getDefaultAlgorithm()); + KeyStoreAdapter KeyStoreAdapter = new KeyStoreAdapter(_defaultKeyStoreId, this, keyStoreAttributes); + addKeyStore(KeyStoreAdapter); + } + String trustStorePath = (String) getAttribute(TRUST_STORE_PATH); + if (trustStorePath != null) + { + Map<String, Object> trsustStoreAttributes = new HashMap<String, Object>(); + trsustStoreAttributes.put(TrustStore.NAME, DEFAULT_TRUST_STORE_NAME); + trsustStoreAttributes.put(TrustStore.PATH, trustStorePath); + trsustStoreAttributes.put(TrustStore.PASSWORD, (String) actualAttributes.get(TRUST_STORE_PASSWORD)); + trsustStoreAttributes.put(TrustStore.TYPE, java.security.KeyStore.getDefaultType()); + trsustStoreAttributes.put(TrustStore.KEY_MANAGER_FACTORY_ALGORITHM, KeyManagerFactory.getDefaultAlgorithm()); + TrustStoreAdapter trustStore = new TrustStoreAdapter(_defaultTrustStoreId, this, trsustStoreAttributes); + addTrustStore(trustStore); } } - public Collection<VirtualHost> getVirtualHosts() { synchronized(_vhostAdapters) @@ -105,81 +234,57 @@ public class BrokerAdapter extends AbstractAdapter implements Broker, VirtualHos } } - private void populatePorts() + + public Collection<Port> getPorts() { synchronized (_portAdapters) { - Map<InetSocketAddress, QpidAcceptor> acceptors = _applicationRegistry.getAcceptors(); - - for(Map.Entry<InetSocketAddress, QpidAcceptor> entry : acceptors.entrySet()) - { - if(!_portAdapters.containsKey(entry.getValue())) - { - _portAdapters.put(entry.getValue(), new PortAdapter(this, entry.getValue(), entry.getKey())); - } - } - if(_applicationRegistry.useHTTPManagement() || _applicationRegistry.useHTTPSManagement()) - { - ArrayList<HTTPPortAdapter> httpPorts = new ArrayList<HTTPPortAdapter>(); - if (_applicationRegistry.useHTTPManagement()) - { - httpPorts.add(new HTTPPortAdapter(this, _applicationRegistry.getHTTPManagementPort())); - } - if (_applicationRegistry.useHTTPSManagement()) - { - httpPorts.add(new HTTPPortAdapter(this, _applicationRegistry.getHTTPSManagementPort(), Protocol.HTTPS, Transport.SSL)); - } - _httpManagementPorts = Collections.unmodifiableCollection(httpPorts); - } + final ArrayList<Port> ports = new ArrayList<Port>(_portAdapters.values()); + return ports; } } - public Collection<Port> getPorts() + public Collection<AuthenticationProvider> getAuthenticationProviders() { - synchronized (_portAdapters) + synchronized (_authenticationProviders) { - final ArrayList<Port> ports = new ArrayList<Port>(_portAdapters.values()); - if(_httpManagementPorts != null) - { - ports.addAll(_httpManagementPorts); - } - return ports; + return new ArrayList<AuthenticationProvider>(_authenticationProviders.values()); } } - private void populateAuthenticationManagers() + public AuthenticationProvider getAuthenticationProviderByName(String authenticationProviderName) { - synchronized (_authManagerAdapters) + Collection<AuthenticationProvider> providers = getAuthenticationProviders(); + for (AuthenticationProvider authenticationProvider : providers) { - IAuthenticationManagerRegistry authenticationManagerRegistry = - _applicationRegistry.getAuthenticationManagerRegistry(); - if(authenticationManagerRegistry != null) + if (authenticationProvider.getName().equals(authenticationProviderName)) { - Map<String, AuthenticationManager> authenticationManagers = - authenticationManagerRegistry.getAvailableAuthenticationManagers(); - - for(Map.Entry<String, AuthenticationManager> entry : authenticationManagers.entrySet()) - { - if(!_authManagerAdapters.containsKey(entry.getValue())) - { - _authManagerAdapters.put(entry.getValue(), - AuthenticationProviderAdapter.createAuthenticationProviderAdapter(this, - entry.getValue())); - } - } + return authenticationProvider; } } + return null; } - public Collection<AuthenticationProvider> getAuthenticationProviders() + @Override + public AuthenticationProvider getDefaultAuthenticationProvider() + { + return _defaultAuthenticationProvider; + } + + public void setDefaultAuthenticationProvider(AuthenticationProvider provider) + { + _defaultAuthenticationProvider = provider; + } + + @Override + public Collection<GroupProvider> getGroupProviders() { - synchronized (_authManagerAdapters) + synchronized (_groupProviders) { - final ArrayList<AuthenticationProvider> authManagers = - new ArrayList<AuthenticationProvider>(_authManagerAdapters.values()); - return authManagers; + final ArrayList<GroupProvider> groupManagers = + new ArrayList<GroupProvider>(_groupProviders.values()); + return groupManagers; } - } public VirtualHost createVirtualHost(final String name, @@ -193,22 +298,29 @@ public class BrokerAdapter extends AbstractAdapter implements Broker, VirtualHos return null; //TODO } - public VirtualHost createVirtualHost(final Map<String, Object> attributes) + private VirtualHost createVirtualHost(final Map<String, Object> attributes) throws AccessControlException, IllegalArgumentException { - return null; //TODO + final VirtualHostAdapter virtualHostAdapter = new VirtualHostAdapter(UUID.randomUUID(), attributes, this, + _statisticsGatherer, getTaskExecutor()); + addVirtualHost(virtualHostAdapter); + virtualHostAdapter.setDesiredState(State.INITIALISING, State.ACTIVE); + return virtualHostAdapter; } - public void deleteVirtualHost(final VirtualHost vhost) - throws AccessControlException, IllegalStateException + private boolean deleteVirtualHost(final VirtualHost vhost) throws AccessControlException, IllegalStateException { - //TODO - throw new UnsupportedOperationException("Not yet implemented"); + synchronized (_vhostAdapters) + { + _vhostAdapters.remove(vhost); + } + vhost.removeChangeListener(this); + return true; } public String getName() { - return _name; + return (String)getAttribute(NAME); } public String setName(final String currentName, final String desiredName) @@ -262,6 +374,7 @@ public class BrokerAdapter extends AbstractAdapter implements Broker, VirtualHos return _statistics; } + @SuppressWarnings("unchecked") @Override public <C extends ConfiguredObject> Collection<C> getChildren(Class<C> clazz) { @@ -277,12 +390,30 @@ public class BrokerAdapter extends AbstractAdapter implements Broker, VirtualHos { return (Collection<C>) getAuthenticationProviders(); } + else if(clazz == GroupProvider.class) + { + return (Collection<C>) getGroupProviders(); + } + else if(clazz == KeyStore.class) + { + return (Collection<C>) getKeyStores(); + } + else if(clazz == TrustStore.class) + { + return (Collection<C>) getTrustStores(); + } + else if(clazz == Plugin.class) + { + return (Collection<C>) getPlugins(); + } return Collections.emptySet(); } + //TODO: ACL + @SuppressWarnings("unchecked") @Override - public <C extends ConfiguredObject> C createChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents) + public <C extends ConfiguredObject> C addChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents) { if(childClass == VirtualHost.class) { @@ -302,111 +433,107 @@ public class BrokerAdapter extends AbstractAdapter implements Broker, VirtualHos } } - private Port createPort(Map<String, Object> attributes) + private void addPort(Port port) { - // TODO - return null; + synchronized (_portAdapters) + { + int portNumber = port.getPort(); + if(_portAdapters.containsKey(portNumber)) + { + throw new IllegalArgumentException("Cannot add port " + port + " because port number " + portNumber + " already configured"); + } + _portAdapters.put(portNumber, port); + } + port.addChangeListener(this); } - private AuthenticationProvider createAuthenticationProvider(Map<String,Object> attributes) + private Port createPort(Map<String, Object> attributes) { - // TODO - return null; + Port port = _portFactory.createPort(UUID.randomUUID(), this, attributes); + addPort(port); + return port; } + private AuthenticationProvider createAuthenticationProvider(Map<String, Object> attributes) + { + // it's cheap to create the groupPrincipalAccessor on the fly + GroupPrincipalAccessor groupPrincipalAccessor = new GroupPrincipalAccessor(_groupProviders.values()); + + AuthenticationProvider authenticationProvider = _authenticationProviderFactory.create(UUID.randomUUID(), this, attributes, groupPrincipalAccessor); + addAuthenticationProvider(authenticationProvider); + return authenticationProvider; + } - public void virtualHostRegistered(org.apache.qpid.server.virtualhost.VirtualHost virtualHost) + /** + * @throws IllegalConfigurationException if an AuthenticationProvider with the same name already exists + */ + private void addAuthenticationProvider(AuthenticationProvider authenticationProvider) { - VirtualHostAdapter adapter = null; - synchronized (_vhostAdapters) + String name = authenticationProvider.getName(); + synchronized (_authenticationProviders) { - if(!_vhostAdapters.containsKey(virtualHost)) + if(_authenticationProviders.containsKey(name)) { - adapter = new VirtualHostAdapter(this, virtualHost); - _vhostAdapters.put(virtualHost, adapter); + throw new IllegalConfigurationException("Cannot add AuthenticationProvider because one with name " + name + " already exists"); } + _authenticationProviders.put(name, authenticationProvider); } - if(adapter != null) - { - childAdded(adapter); - } + authenticationProvider.addChangeListener(this); } - public void virtualHostUnregistered(org.apache.qpid.server.virtualhost.VirtualHost virtualHost) + private void addGroupProvider(GroupProvider groupProvider) { - VirtualHostAdapter adapter = null; - - synchronized (_vhostAdapters) - { - adapter = _vhostAdapters.remove(virtualHost); - } - if(adapter != null) + synchronized (_groupProviders) { - childRemoved(adapter); + String name = groupProvider.getName(); + if(_groupProviders.containsKey(name)) + { + throw new IllegalConfigurationException("Cannot add GroupProvider because one with name " + name + " already exists"); + } + _groupProviders.put(name, groupProvider); } + groupProvider.addChangeListener(this); } - @Override - public void authenticationManagerRegistered(AuthenticationManager authenticationManager) + private boolean deleteGroupProvider(GroupProvider object) + { + throw new UnsupportedOperationException("Not implemented yet!"); + } + + private void addKeyStore(KeyStore keyStore) { - AuthenticationProviderAdapter adapter = null; - synchronized (_authManagerAdapters) + synchronized (_keyStores) { - if(!_authManagerAdapters.containsKey(authenticationManager)) + if(_keyStores.containsKey(keyStore.getId())) { - adapter = - AuthenticationProviderAdapter.createAuthenticationProviderAdapter(this, authenticationManager); - _authManagerAdapters.put(authenticationManager, adapter); + throw new IllegalConfigurationException("Cannot add KeyStore because one with id " + keyStore.getId() + " already exists"); } + _keyStores.put(keyStore.getId(), keyStore); } - if(adapter != null) - { - childAdded(adapter); - } + keyStore.addChangeListener(this); } - @Override - public void authenticationManagerUnregistered(AuthenticationManager authenticationManager) + private boolean deleteKeyStore(KeyStore object) { - AuthenticationProviderAdapter adapter; - synchronized (_authManagerAdapters) - { - adapter = _authManagerAdapters.remove(authenticationManager); - } - if(adapter != null) - { - childRemoved(adapter); - } + throw new UnsupportedOperationException("Not implemented yet!"); } - - @Override - public void bound(QpidAcceptor acceptor, InetSocketAddress bindAddress) + private void addTrustStore(TrustStore trustStore) { - synchronized (_portAdapters) + synchronized (_trustStores) { - if(!_portAdapters.containsKey(acceptor)) + if(_trustStores.containsKey(trustStore.getId())) { - PortAdapter adapter = new PortAdapter(this, acceptor, bindAddress); - _portAdapters.put(acceptor, adapter); - childAdded(adapter); + throw new IllegalConfigurationException("Cannot add TrustStore because one with id " + trustStore.getId() + " already exists"); } + _trustStores.put(trustStore.getId(), trustStore); } + trustStore.addChangeListener(this); } - @Override - public void unbound(QpidAcceptor acceptor) + private boolean deleteTrustStore(TrustStore object) { - PortAdapter adapter = null; - - synchronized (_portAdapters) - { - adapter = _portAdapters.remove(acceptor); - } - if(adapter != null) - { - childRemoved(adapter); - } + throw new UnsupportedOperationException("Not implemented yet!"); } @Override @@ -422,10 +549,6 @@ public class BrokerAdapter extends AbstractAdapter implements Broker, VirtualHos { return getId(); } - else if(NAME.equals(name)) - { - return getName(); - } else if(STATE.equals(name)) { return State.ACTIVE; @@ -481,14 +604,316 @@ public class BrokerAdapter extends AbstractAdapter implements Broker, VirtualHos { // TODO } + else if (DEFAULT_AUTHENTICATION_PROVIDER.equals(name)) + { + return _defaultAuthenticationProvider == null ? null : _defaultAuthenticationProvider.getName(); + } + else if (KEY_STORE_PASSWORD.equals(name)) + { + return DUMMY_PASSWORD_MASK; + } + else if (TRUST_STORE_PASSWORD.equals(name)) + { + return DUMMY_PASSWORD_MASK; + } + return super.getAttribute(name); + } - return super.getAttribute(name); //TODO - Implement. + private boolean deletePort(Port portAdapter) + { + Port removedPort = null; + synchronized (_portAdapters) + { + removedPort = _portAdapters.remove(portAdapter.getPort()); + } + return removedPort != null; + } + + private boolean deleteAuthenticationProvider(AuthenticationProvider authenticationProvider) + { + AuthenticationProvider removedAuthenticationProvider = null; + synchronized (_authenticationProviders) + { + removedAuthenticationProvider = _authenticationProviders.remove(authenticationProvider.getName()); + } + return removedAuthenticationProvider != null; + } + + private void addVirtualHost(VirtualHost virtualHost) + { + synchronized (_vhostAdapters) + { + String name = virtualHost.getName(); + if (_vhostAdapters.containsKey(name)) + { + throw new IllegalConfigurationException("Virtual host with name " + name + " is already specified!"); + } + _vhostAdapters.put(name, virtualHost); + } + virtualHost.addChangeListener(this); } @Override - public Object setAttribute(String name, Object expected, Object desired) - throws IllegalStateException, AccessControlException, IllegalArgumentException + public boolean setState(State currentState, State desiredState) + { + if (desiredState == State.ACTIVE) + { + changeState(_groupProviders, currentState, State.ACTIVE, false); + changeState(_authenticationProviders, currentState, State.ACTIVE, false); + + CurrentActor.set(new BrokerActor(getRootMessageLogger())); + try + { + changeState(_vhostAdapters, currentState, State.ACTIVE, false); + } + finally + { + CurrentActor.remove(); + } + + changeState(_portAdapters, currentState,State.ACTIVE, false); + changeState(_plugins, currentState,State.ACTIVE, false); + return true; + } + else if (desiredState == State.STOPPED) + { + changeState(_plugins, currentState,State.STOPPED, true); + changeState(_portAdapters, currentState, State.STOPPED, true); + changeState(_vhostAdapters,currentState, State.STOPPED, true); + changeState(_authenticationProviders, currentState, State.STOPPED, true); + changeState(_groupProviders, currentState, State.STOPPED, true); + return true; + } + return false; + } + + private void changeState(Map<?, ? extends ConfiguredObject> configuredObjectMap, State currentState, State desiredState, boolean swallowException) + { + synchronized(configuredObjectMap) + { + Collection<? extends ConfiguredObject> adapters = configuredObjectMap.values(); + for (ConfiguredObject configuredObject : adapters) + { + if (State.ACTIVE.equals(desiredState) && State.QUIESCED.equals(configuredObject.getActualState())) + { + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug(configuredObject + " cannot be activated as it is " +State.QUIESCED); + } + continue; + } + try + { + configuredObject.setDesiredState(currentState, desiredState); + } + catch(RuntimeException e) + { + if (swallowException) + { + LOGGER.error("Failed to stop " + configuredObject, e); + } + else + { + throw e; + } + } + } + } + } + + @Override + public void stateChanged(ConfiguredObject object, State oldState, State newState) + { + if(newState == State.DELETED) + { + boolean childDeleted = false; + if(object instanceof AuthenticationProvider) + { + childDeleted = deleteAuthenticationProvider((AuthenticationProvider)object); + } + else if(object instanceof Port) + { + childDeleted = deletePort((Port)object); + } + else if(object instanceof VirtualHost) + { + childDeleted = deleteVirtualHost((VirtualHost)object); + } + else if(object instanceof GroupProvider) + { + childDeleted = deleteGroupProvider((GroupProvider)object); + } + else if(object instanceof KeyStore) + { + childDeleted = deleteKeyStore((KeyStore)object); + } + else if(object instanceof TrustStore) + { + childDeleted = deleteTrustStore((TrustStore)object); + } + if(childDeleted) + { + childRemoved(object); + } + } + } + + @Override + public void childAdded(ConfiguredObject object, ConfiguredObject child) + { + // no-op + } + + @Override + public void childRemoved(ConfiguredObject object, ConfiguredObject child) + { + // no-op + } + + @Override + public void attributeSet(ConfiguredObject object, String attributeName, Object oldAttributeValue, Object newAttributeValue) + { + // no-op + } + + private void addPlugin(ConfiguredObject plugin) + { + synchronized(_plugins) + { + if (_plugins.containsKey(plugin.getId())) + { + throw new IllegalConfigurationException("Plugin with id '" + plugin.getId() + "' is already registered!"); + } + _plugins.put(plugin.getId(), plugin); + } + plugin.addChangeListener(this); + } + + + private Collection<ConfiguredObject> getPlugins() + { + synchronized(_plugins) + { + return Collections.unmodifiableCollection(_plugins.values()); + } + } + + public void recoverChild(ConfiguredObject object) + { + if(object instanceof AuthenticationProvider) + { + addAuthenticationProvider((AuthenticationProvider)object); + } + else if(object instanceof Port) + { + addPort((Port)object); + } + else if(object instanceof VirtualHost) + { + addVirtualHost((VirtualHost)object); + } + else if(object instanceof GroupProvider) + { + addGroupProvider((GroupProvider)object); + } + else if(object instanceof KeyStore) + { + addKeyStore((KeyStore)object); + } + else if(object instanceof TrustStore) + { + addTrustStore((TrustStore)object); + } + else if(object instanceof Plugin) + { + addPlugin(object); + } + else + { + throw new IllegalArgumentException("Attempted to recover unexpected type of configured object: " + object.getClass().getName()); + } + } + + @Override + public RootMessageLogger getRootMessageLogger() + { + return _rootMessageLogger; + } + + @Override + public SecurityManager getSecurityManager() + { + return _securityManager; + } + + @Override + public LogRecorder getLogRecorder() + { + return _logRecorder; + } + + @Override + public VirtualHost findVirtualHostByName(String name) + { + return _vhostAdapters.get(name); + } + + @Override + public SubjectCreator getSubjectCreator(SocketAddress localAddress) + { + InetSocketAddress inetSocketAddress = (InetSocketAddress)localAddress; + AuthenticationProvider provider = _defaultAuthenticationProvider; + Collection<Port> ports = getPorts(); + for (Port p : ports) + { + if (inetSocketAddress.getPort() == p.getPort()) + { + provider = p.getAuthenticationProvider(); + break; + } + } + return provider.getSubjectCreator(); + } + + @Override + public Collection<KeyStore> getKeyStores() + { + synchronized(_trustStores) + { + return Collections.unmodifiableCollection(_keyStores.values()); + } + } + + @Override + public Collection<TrustStore> getTrustStores() + { + synchronized(_trustStores) + { + return Collections.unmodifiableCollection(_trustStores.values()); + } + } + + @Override + public VirtualHostRegistry getVirtualHostRegistry() + { + return _virtualHostRegistry; + } + + @Override + public KeyStore getDefaultKeyStore() + { + return _keyStores.get(_defaultKeyStoreId); + } + + @Override + public TrustStore getDefaultTrustStore() + { + return _trustStores.get(_defaultTrustStoreId); + } + + @Override + public TaskExecutor getTaskExecutor() { - return super.setAttribute(name, expected, desired); //TODO - Implement. + return super.getTaskExecutor(); } } |