diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java | 81 |
1 files changed, 42 insertions, 39 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java index 33c713c62a..17c65003e9 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.server.virtualhost; -import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -72,7 +71,6 @@ import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.auth.manager.AuthenticationManager; -import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager; import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.store.ConfigurationRecoveryHandler; import org.apache.qpid.server.store.DurableConfigurationStore; @@ -179,22 +177,11 @@ public class VirtualHostImpl implements VirtualHost } } - public VirtualHostImpl(IApplicationRegistry appRegistry, VirtualHostConfiguration hostConfig) throws Exception - { - this(appRegistry, hostConfig, null); - } - - - public VirtualHostImpl(VirtualHostConfiguration hostConfig, MessageStore store) throws Exception - { - this(ApplicationRegistry.getInstance(),hostConfig,store); - } - - private VirtualHostImpl(IApplicationRegistry appRegistry, VirtualHostConfiguration hostConfig, MessageStore store) throws Exception + public VirtualHostImpl(IApplicationRegistry appRegistry, VirtualHostConfiguration hostConfig, MessageStore store) throws Exception { if (hostConfig == null) { - throw new IllegalAccessException("HostConfig and MessageStore cannot be null"); + throw new IllegalArgumentException("HostConfig cannot be null"); } _appRegistry = appRegistry; @@ -252,19 +239,24 @@ public class VirtualHostImpl implements VirtualHost _brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean); _brokerMBean.register(); - initialiseHouseKeeping(hostConfig.getHousekeepingExpiredMessageCheckPeriod()); + initialiseHouseKeeping(hostConfig.getHousekeepingCheckPeriod()); initialiseStatistics(); } + /** + * Initialise a housekeeping task to iterate over queues cleaning expired messages with no consumers + * and checking for idle or open transactions that have exceeded the permitted thresholds. + * + * @param period + */ private void initialiseHouseKeeping(long period) { - /* add a timer task to iterate over queues, cleaning expired messages from queues with no consumers */ if (period != 0L) { - class ExpiredMessagesTask extends HouseKeepingTask + class VirtualHostHouseKeepingTask extends HouseKeepingTask { - public ExpiredMessagesTask(VirtualHost vhost) + public VirtualHostHouseKeepingTask(VirtualHost vhost) { super(vhost); } @@ -309,7 +301,7 @@ public class VirtualHostImpl implements VirtualHost } } - scheduleHouseKeepingTask(period, new ExpiredMessagesTask(this)); + scheduleHouseKeepingTask(period, new VirtualHostHouseKeepingTask(this)); Map<String, VirtualHostPluginFactory> plugins = ApplicationRegistry.getInstance().getPluginManager().getVirtualHostPlugins(); @@ -463,46 +455,57 @@ public class VirtualHostImpl implements VirtualHost private void configureQueue(QueueConfiguration queueConfiguration) throws AMQException, ConfigurationException { AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueConfiguration, this); + String queueName = queue.getName(); if (queue.isDurable()) { getDurableConfigurationStore().createQueue(queue); } + //get the exchange name (returns default exchange name if none was specified) String exchangeName = queueConfiguration.getExchange(); - Exchange exchange = _exchangeRegistry.getExchange(exchangeName == null ? null : new AMQShortString(exchangeName)); - - if (exchange == null) - { - exchange = _exchangeRegistry.getDefaultExchange(); - } - + Exchange exchange = _exchangeRegistry.getExchange(exchangeName); if (exchange == null) { - throw new ConfigurationException("Attempt to bind queue to unknown exchange:" + exchangeName); + throw new ConfigurationException("Attempt to bind queue '" + queueName + "' to unknown exchange:" + exchangeName); } - List routingKeys = queueConfiguration.getRoutingKeys(); - if (routingKeys == null || routingKeys.isEmpty()) - { - routingKeys = Collections.singletonList(queue.getNameShortString()); - } + Exchange defaultExchange = _exchangeRegistry.getDefaultExchange(); + + //get routing keys in configuration (returns empty list if none are defined) + List<?> routingKeys = queueConfiguration.getRoutingKeys(); for (Object routingKeyNameObj : routingKeys) { - AMQShortString routingKey = new AMQShortString(String.valueOf(routingKeyNameObj)); - if (_logger.isInfoEnabled()) + String routingKey = String.valueOf(routingKeyNameObj); + + if (exchange.equals(defaultExchange) && !queueName.equals(routingKey)) { - _logger.info("Binding queue:" + queue + " with routing key '" + routingKey + "' to exchange:" + this); + throw new ConfigurationException("Illegal attempt to bind queue '" + queueName + + "' to the default exchange with a key other than the queue name: " + routingKey); } - _bindingFactory.addBinding(routingKey.toString(), queue, exchange, null); + + configureBinding(queue, exchange, routingKey); } - if (exchange != _exchangeRegistry.getDefaultExchange()) + if (!exchange.equals(defaultExchange)) + { + //bind the queue to the named exchange using its name + configureBinding(queue, exchange, queueName); + } + + //ensure the queue is bound to the default exchange using its name + configureBinding(queue, defaultExchange, queueName); + } + + private void configureBinding(AMQQueue queue, Exchange exchange, String routingKey) throws AMQException + { + if (_logger.isInfoEnabled()) { - _bindingFactory.addBinding(queue.getNameShortString().toString(), queue, exchange, null); + _logger.info("Binding queue:" + queue + " with routing key '" + routingKey + "' to exchange:" + exchange.getName()); } + _bindingFactory.addBinding(routingKey, queue, exchange, null); } public String getName() |