diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java | 338 |
1 files changed, 118 insertions, 220 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java index bd3e5b1f72..6c72025ec2 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java +++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java @@ -20,263 +20,161 @@ */ package org.apache.qpid.server.configuration; -import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; import java.util.List; +import java.util.Map; import org.apache.commons.configuration.CompositeConfiguration; import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; -import org.apache.commons.configuration.XMLConfiguration; -import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.exchange.ExchangeFactory; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.server.queue.AMQQueueFactory; +import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.store.MemoryMessageStore; public class VirtualHostConfiguration { - private static final Logger _logger = Logger.getLogger(VirtualHostConfiguration.class); + private Configuration _config; + private String _name; + private Map<String, QueueConfiguration> _queues = new HashMap<String, QueueConfiguration>(); + private Map<String, ExchangeConfiguration> _exchanges = new HashMap<String, ExchangeConfiguration>(); + + public VirtualHostConfiguration(String name, Configuration config) throws ConfigurationException + { + _config = config; + _name = name; + Iterator i = _config.getList("queues.queue.name").iterator(); + + while (i.hasNext()) + { + String queueName = (String) i.next(); + CompositeConfiguration mungedConf = new CompositeConfiguration(); + mungedConf.addConfiguration(_config.subset("queues.queue." + queueName)); + mungedConf.addConfiguration(_config.subset("queues")); + _queues.put(queueName, new QueueConfiguration(queueName, mungedConf, this)); + } - private static XMLConfiguration _config; + i = _config.getList("exchanges.exchange.name").iterator(); + int count = 0; + while (i.hasNext()) + { + CompositeConfiguration mungedConf = new CompositeConfiguration(); + mungedConf.addConfiguration(config.subset("exchanges.exchange(" + count++ + ")")); + mungedConf.addConfiguration(_config.subset("exchanges")); + String exchName = (String) i.next(); + _exchanges.put(exchName, new ExchangeConfiguration(exchName, mungedConf)); + } - private static final String VIRTUALHOST_PROPERTY_BASE = "virtualhost."; + } + public String getName() + { + return _name; + } - public VirtualHostConfiguration(String configFile) throws ConfigurationException + public long getHousekeepingExpiredMessageCheckPeriod() + { + return _config.getLong("housekeeping.expiredMessageCheckPeriod", ApplicationRegistry.getInstance().getConfiguration().getHousekeepingCheckPeriod()); + } + + public String getAuthenticationDatabase() + { + return _config.getString("security.authentication.name"); + } + + public List getCustomExchanges() + { + return _config.getList("custom-exchanges.class-name"); + } + + public SecurityConfiguration getSecurityConfiguration() + { + return new SecurityConfiguration(_config.subset("security")); + } + + public Configuration getStoreConfiguration() + { + return _config.subset("store"); + } + + public String getMessageStoreClass() + { + return _config.getString("store.class", MemoryMessageStore.class.getName()); + } + + public List getExchanges() + { + return _config.getList("exchanges.exchange.name"); + } + + public String[] getQueueNames() + { + return _queues.keySet().toArray(new String[_queues.size()]); + } + + public ExchangeConfiguration getExchangeConfiguration(String exchangeName) { - _logger.info("Loading Config file:" + configFile); - - _config = new XMLConfiguration(configFile); - + return _exchanges.get(exchangeName); } - - - private void configureVirtualHost(String virtualHostName, Configuration configuration) throws ConfigurationException, AMQException + public QueueConfiguration getQueueConfiguration(String queueName) { - _logger.debug("Loding configuration for virtaulhost: "+virtualHostName); - - - VirtualHost virtualHost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost(virtualHostName); - - - - if(virtualHost == null) + // We might be asked for the config for a queue we don't know about, + // such as one that's been dynamically created. Those get the defaults by default. + if (_queues.containsKey(queueName)) { - throw new ConfigurationException("Unknown virtual host: " + virtualHostName); - } - - List exchangeNames = configuration.getList("exchanges.exchange.name"); - - for(Object exchangeNameObj : exchangeNames) - { - String exchangeName = String.valueOf(exchangeNameObj); - configureExchange(virtualHost, exchangeName, configuration); - } - - - List queueNames = configuration.getList("queues.queue.name"); - - for(Object queueNameObj : queueNames) + return _queues.get(queueName); + } + else { - String queueName = String.valueOf(queueNameObj); - configureQueue(virtualHost, queueName, configuration); + return new QueueConfiguration(queueName, new PropertiesConfiguration(), this); } - } - private void configureExchange(VirtualHost virtualHost, String exchangeNameString, Configuration configuration) throws AMQException + public long getMemoryUsageMaximum() { - - CompositeConfiguration exchangeConfiguration = new CompositeConfiguration(); - - exchangeConfiguration.addConfiguration(configuration.subset("exchanges.exchange."+ exchangeNameString)); - exchangeConfiguration.addConfiguration(configuration.subset("exchanges")); - - QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); - MessageStore messageStore = virtualHost.getMessageStore(); - ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry(); - ExchangeFactory exchangeFactory = virtualHost.getExchangeFactory(); - - AMQShortString exchangeName = new AMQShortString(exchangeNameString); - - - Exchange exchange; - - - - synchronized (exchangeRegistry) - { - exchange = exchangeRegistry.getExchange(exchangeName); - if(exchange == null) - { - - AMQShortString type = new AMQShortString(exchangeConfiguration.getString("type","direct")); - boolean durable = exchangeConfiguration.getBoolean("durable",false); - boolean autodelete = exchangeConfiguration.getBoolean("autodelete",false); - - Exchange newExchange = exchangeFactory.createExchange(exchangeName,type,durable,autodelete,0); - exchangeRegistry.registerExchange(newExchange); - } - - } + return _config.getLong("queues.maximumMemoryUsage", 0); } - public static CompositeConfiguration getDefaultQueueConfiguration(AMQQueue queue) + public long getMemoryUsageMinimum() { - CompositeConfiguration queueConfiguration = null; - if (_config == null) - return null; - - Configuration vHostConfiguration = _config.subset(VIRTUALHOST_PROPERTY_BASE + queue.getVirtualHost().getName()); - - if (vHostConfiguration == null) - return null; - - Configuration defaultQueueConfiguration = vHostConfiguration.subset("queues"); - if (defaultQueueConfiguration != null) - { - queueConfiguration = new CompositeConfiguration(); - queueConfiguration.addConfiguration(defaultQueueConfiguration); - } - - return queueConfiguration; + return _config.getLong("queues.minimumMemoryUsage", 0); } - - private void configureQueue(VirtualHost virtualHost, String queueNameString, Configuration configuration) throws AMQException, ConfigurationException + + public int getMaximumMessageAge() { - CompositeConfiguration queueConfiguration = new CompositeConfiguration(); - - queueConfiguration.addConfiguration(configuration.subset("queues.queue."+ queueNameString)); - queueConfiguration.addConfiguration(configuration.subset("queues")); - - QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); - MessageStore messageStore = virtualHost.getMessageStore(); - ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry(); - - - AMQShortString queueName = new AMQShortString(queueNameString); - - AMQQueue queue; - - synchronized (queueRegistry) - { - queue = queueRegistry.getQueue(queueName); - - if (queue == null) - { - _logger.info("Creating queue '" + queueName + "' on virtual host " + virtualHost.getName()); - - boolean durable = queueConfiguration.getBoolean("durable" ,false); - boolean autodelete = queueConfiguration.getBoolean("autodelete", false); - String owner = queueConfiguration.getString("owner", null); - FieldTable arguments = null; - Integer priorities = queueConfiguration.getInteger("priorities", null); - if(priorities != null && priorities.intValue() > 1) - { - if(arguments == null) - { - arguments = new FieldTable(); - } - arguments.put(new AMQShortString("x-qpid-priorities"), priorities); - } - - - queue = AMQQueueFactory.createAMQQueueImpl(queueName, - durable, - owner == null ? null : new AMQShortString(owner) /* These queues will have no owner */, - autodelete /* Therefore autodelete makes no sence */, virtualHost, arguments); - - if (queue.isDurable()) - { - messageStore.createQueue(queue); - } - - queueRegistry.registerQueue(queue); - } - else - { - _logger.info("Queue '" + queueNameString + "' already exists on virtual host "+virtualHost.getName()+", not creating."); - } - - String exchangeName = queueConfiguration.getString("exchange", null); - - Exchange exchange = exchangeRegistry.getExchange(exchangeName == null ? null : new AMQShortString(exchangeName)); - - if(exchange == null) - { - exchange = virtualHost.getExchangeRegistry().getDefaultExchange(); - } - - if (exchange == null) - { - throw new ConfigurationException("Attempt to bind queue to unknown exchange:" + exchangeName); - } - - synchronized (exchange) - { - List routingKeys = queueConfiguration.getList("routingKey"); - if(routingKeys == null || routingKeys.isEmpty()) - { - routingKeys = Collections.singletonList(queue.getName()); - } - - for(Object routingKeyNameObj : routingKeys) - { - AMQShortString routingKey = new AMQShortString(String.valueOf(routingKeyNameObj)); - - - queue.bind(exchange, routingKey, null); - - - _logger.info("Queue '" + queue.getName() + "' bound to exchange:" + exchangeName + " RK:'" + routingKey + "'"); - } - - if(exchange != virtualHost.getExchangeRegistry().getDefaultExchange()) - { - queue.bind(virtualHost.getExchangeRegistry().getDefaultExchange(), queue.getName(), null); - } - } - - } - + return _config.getInt("queues.maximumMessageAge", 0); + } + public Long getMaximumQueueDepth() + { + return _config.getLong("queues.maximumQueueDepth", 0); + } - Configurator.configure(queue, queueConfiguration); + public Long getMaximumMessageSize() + { + return _config.getLong("queues.maximumMessageSize", 0); } + public Long getMaximumMessageCount() + { + return _config.getLong("queues.maximumMessageCount", 0); + } - public void performBindings() throws AMQException, ConfigurationException + public Long getMinimumAlertRepeatGap() { - List virtualHostNames = _config.getList(VIRTUALHOST_PROPERTY_BASE + "name"); - String defaultVirtualHostName = _config.getString("default"); - if(defaultVirtualHostName != null) - { - ApplicationRegistry.getInstance().getVirtualHostRegistry().setDefaultVirtualHostName(defaultVirtualHostName); - } - _logger.info("Configuring " + virtualHostNames == null ? 0 : virtualHostNames.size() + " virtual hosts: " + virtualHostNames); + return _config.getLong("queues.minimumAlertRepeatGap", 0); + } - for(Object nameObject : virtualHostNames) - { - String name = String.valueOf(nameObject); - configureVirtualHost(name, _config.subset(VIRTUALHOST_PROPERTY_BASE + name)); - } - if (virtualHostNames == null || virtualHostNames.isEmpty()) - { - throw new ConfigurationException( - "Virtualhost Configuration document does not contain a valid virtualhost."); - } + public long getCapacity() + { + return _config.getLong("queues.capacity", 0l); } - + public long getFlowResumeCapacity() + { + return _config.getLong("queues.flowResumeCapacity", getCapacity()); + } } |