diff options
author | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
commit | 9c73ef7a5ac10acd6a50d5d52bd721fc2faa5919 (patch) | |
tree | 2a890e1df09e5b896a9b4168a7b22648f559a1f2 /java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java | |
parent | 172d9b2a16cfb817bbe632d050acba7e31401cd2 (diff) | |
download | qpid-python-9c73ef7a5ac10acd6a50d5d52bd721fc2faa5919.tar.gz |
Update from trunk r1375509 through r1450773asyncstore
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1451244 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java | 173 |
1 files changed, 32 insertions, 141 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java index d9dc0aa64e..dd3610373f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java @@ -25,7 +25,6 @@ import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -35,12 +34,8 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.binding.BindingFactory; -import org.apache.qpid.server.configuration.BrokerConfig; -import org.apache.qpid.server.configuration.ConfigStore; -import org.apache.qpid.server.configuration.ConfiguredObject; import org.apache.qpid.server.configuration.ExchangeConfiguration; import org.apache.qpid.server.configuration.QueueConfiguration; -import org.apache.qpid.server.configuration.VirtualHostConfigType; import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.connection.ConnectionRegistry; import org.apache.qpid.server.connection.IConnectionRegistry; @@ -49,7 +44,6 @@ import org.apache.qpid.server.exchange.DefaultExchangeRegistry; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeFactory; import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.federation.BrokerLink; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.messages.VirtualHostMessages; import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; @@ -61,17 +55,16 @@ import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.DefaultQueueRegistry; import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.stats.StatisticsCounter; +import org.apache.qpid.server.stats.StatisticsGatherer; import org.apache.qpid.server.store.Event; import org.apache.qpid.server.store.EventListener; import org.apache.qpid.server.store.HAMessageStore; import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.MessageStoreCreator; import org.apache.qpid.server.store.OperationalLoggingListener; import org.apache.qpid.server.txn.DtxRegistry; -import org.apache.qpid.server.virtualhost.plugins.VirtualHostPlugin; -import org.apache.qpid.server.virtualhost.plugins.VirtualHostPluginFactory; public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.RegistryChangeListener, EventListener { @@ -79,23 +72,19 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr private static final int HOUSEKEEPING_SHUTDOWN_TIMEOUT = 5; - private final UUID _qmfId; - private final String _name; private final UUID _id; private final long _createTime = System.currentTimeMillis(); - private final ConcurrentHashMap<BrokerLink,BrokerLink> _links = new ConcurrentHashMap<BrokerLink, BrokerLink>(); - private final ScheduledThreadPoolExecutor _houseKeepingTasks; - private final IApplicationRegistry _appRegistry; + private final VirtualHostRegistry _virtualHostRegistry; - private final SecurityManager _securityManager; + private final StatisticsGatherer _brokerStatisticsGatherer; - private final BrokerConfig _brokerConfig; + private final SecurityManager _securityManager; private final VirtualHostConfiguration _vhostConfig; @@ -120,7 +109,7 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr private final Map<String, LinkRegistry> _linkRegistry = new HashMap<String, LinkRegistry>(); private boolean _blocked; - public VirtualHostImpl(IApplicationRegistry appRegistry, VirtualHostConfiguration hostConfig) throws Exception + public VirtualHostImpl(VirtualHostRegistry virtualHostRegistry, StatisticsGatherer brokerStatisticsGatherer, SecurityManager parentSecurityManager, VirtualHostConfiguration hostConfig) throws Exception { if (hostConfig == null) { @@ -132,19 +121,17 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr throw new IllegalArgumentException("Illegal name (" + hostConfig.getName() + ") for virtualhost."); } - _appRegistry = appRegistry; - _brokerConfig = _appRegistry.getBrokerConfig(); + _virtualHostRegistry = virtualHostRegistry; + _brokerStatisticsGatherer = brokerStatisticsGatherer; _vhostConfig = hostConfig; _name = _vhostConfig.getName(); _dtxRegistry = new DtxRegistry(); - _qmfId = _appRegistry.getConfigStore().createId(); _id = UUIDGenerator.generateVhostUUID(_name); CurrentActor.get().message(VirtualHostMessages.CREATED(_name)); - _securityManager = new SecurityManager(_appRegistry.getSecurityManager()); - _securityManager.configureHostPlugins(_vhostConfig); + _securityManager = new SecurityManager(parentSecurityManager, _vhostConfig.getConfig().getString("security.acl")); _connectionRegistry = new ConnectionRegistry(); _connectionRegistry.addRegistryChangeListener(this); @@ -154,13 +141,12 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr _queueRegistry = new DefaultQueueRegistry(this); _exchangeFactory = new DefaultExchangeFactory(this); - _exchangeFactory.initialise(_vhostConfig); _exchangeRegistry = new DefaultExchangeRegistry(this); _bindingFactory = new BindingFactory(this); - _messageStore = initialiseMessageStore(hostConfig.getMessageStoreClass()); + _messageStore = initialiseMessageStore(hostConfig); configureMessageStore(hostConfig); @@ -187,22 +173,6 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr return _id; } - @Override - public UUID getQMFId() - { - return _qmfId; - } - - public VirtualHostConfigType getConfigType() - { - return VirtualHostConfigType.getInstance(); - } - - public ConfiguredObject getParent() - { - return getBroker(); - } - public boolean isDurable() { return false; @@ -216,38 +186,9 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr */ private void initialiseHouseKeeping(long period) { - if (period != 0L) { scheduleHouseKeepingTask(period, new VirtualHostHouseKeepingTask()); - - Map<String, VirtualHostPluginFactory> plugins = _appRegistry.getPluginManager().getVirtualHostPlugins(); - - if (plugins != null) - { - for (Map.Entry<String, VirtualHostPluginFactory> entry : plugins.entrySet()) - { - String pluginName = entry.getKey(); - VirtualHostPluginFactory factory = entry.getValue(); - try - { - VirtualHostPlugin plugin = factory.newInstance(this); - - // If we had configuration for the plugin the schedule it. - if (plugin != null) - { - _houseKeepingTasks.scheduleAtFixedRate(plugin, plugin.getDelay() / 2, - plugin.getDelay(), plugin.getTimeUnit()); - - _logger.info("Loaded VirtualHostPlugin:" + plugin); - } - } - catch (RuntimeException e) - { - _logger.error("Unable to load VirtualHostPlugin:" + pluginName + " due to:" + e.getMessage(), e); - } - } - } } } @@ -329,19 +270,34 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr if (!(o instanceof MessageStore)) { - throw new ClassCastException("Message store factory class must implement " + MessageStore.class + + throw new ClassCastException("Message store class must implement " + MessageStore.class + ". Class " + clazz + " does not."); } final MessageStore messageStore = (MessageStore) o; - final MessageStoreLogSubject storeLogSubject = new MessageStoreLogSubject(this, clazz.getSimpleName()); + return messageStore; + } + + private MessageStore initialiseMessageStore(VirtualHostConfiguration hostConfig) throws Exception + { + String storeType = hostConfig.getConfig().getString("store.type"); + MessageStore messageStore = null; + if (storeType == null) + { + messageStore = initialiseMessageStore(hostConfig.getMessageStoreClass()); + } + else + { + messageStore = new MessageStoreCreator().createMessageStore(storeType); + } + + final MessageStoreLogSubject storeLogSubject = new MessageStoreLogSubject(this, messageStore.getClass().getSimpleName()); OperationalLoggingListener.listen(messageStore, storeLogSubject); messageStore.addEventListener(new BeforeActivationListener(), Event.BEFORE_ACTIVATE); messageStore.addEventListener(new AfterActivationListener(), Event.AFTER_ACTIVATE); messageStore.addEventListener(new BeforeCloseListener(), Event.BEFORE_CLOSE); messageStore.addEventListener(new BeforePassivationListener(), Event.BEFORE_PASSIVATE); - return messageStore; } @@ -468,16 +424,6 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr return _name; } - public BrokerConfig getBroker() - { - return _brokerConfig; - } - - public String getFederationTag() - { - return _brokerConfig.getFederationTag(); - } - public long getCreateTime() { return _createTime; @@ -534,14 +480,9 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr CurrentActor.get().message(VirtualHostMessages.CLOSED()); } - public UUID getBrokerId() - { - return _appRegistry.getBrokerId(); - } - - public IApplicationRegistry getApplicationRegistry() + public VirtualHostRegistry getVirtualHostRegistry() { - return _appRegistry; + return _virtualHostRegistry; } public BindingFactory getBindingFactory() @@ -553,14 +494,14 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr { _messagesDelivered.registerEvent(1L); _dataDelivered.registerEvent(messageSize); - _appRegistry.registerMessageDelivered(messageSize); + _brokerStatisticsGatherer.registerMessageDelivered(messageSize); } public void registerMessageReceived(long messageSize, long timestamp) { _messagesReceived.registerEvent(1L, timestamp); _dataReceived.registerEvent(messageSize, timestamp); - _appRegistry.registerMessageReceived(messageSize, timestamp); + _brokerStatisticsGatherer.registerMessageReceived(messageSize, timestamp); } public StatisticsCounter getMessageReceiptStatistics() @@ -604,51 +545,6 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr _dataReceived = new StatisticsCounter("bytes-received-" + getName()); } - public BrokerLink createBrokerConnection(UUID id, long createTime, Map<String,String> arguments) - { - BrokerLink blink = new BrokerLink(this, id, createTime, arguments); - // TODO - cope with duplicate broker link creation requests - _links.putIfAbsent(blink,blink); - getConfigStore().addConfiguredObject(blink); - return blink; - } - - public void createBrokerConnection(final String transport, - final String host, - final int port, - final String vhost, - final boolean durable, - final String authMechanism, - final String username, - final String password) - { - BrokerLink blink = new BrokerLink(this, transport, host, port, vhost, durable, authMechanism, username, password); - - // TODO - cope with duplicate broker link creation requests - _links.putIfAbsent(blink,blink); - getConfigStore().addConfiguredObject(blink); - - } - - public void removeBrokerConnection(final String transport, - final String host, - final int port, - final String vhost) - { - removeBrokerConnection(new BrokerLink(this, transport, host, port, vhost, false, null,null,null)); - - } - - public void removeBrokerConnection(BrokerLink blink) - { - blink = _links.get(blink); - if(blink != null) - { - blink.close(); - getConfigStore().removeConfiguredObject(blink); - } - } - public synchronized LinkRegistry getLinkRegistry(String remoteContainerId) { LinkRegistry linkRegistry = _linkRegistry.get(remoteContainerId); @@ -660,11 +556,6 @@ public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.Registr return linkRegistry; } - public ConfigStore getConfigStore() - { - return getApplicationRegistry().getConfigStore(); - } - public DtxRegistry getDtxRegistry() { return _dtxRegistry; |