summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java81
1 files changed, 42 insertions, 39 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
index 33c713c62a..17c65003e9 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.virtualhost;
-import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -72,7 +71,6 @@ import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
-import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
import org.apache.qpid.server.store.DurableConfigurationStore;
@@ -179,22 +177,11 @@ public class VirtualHostImpl implements VirtualHost
}
}
- public VirtualHostImpl(IApplicationRegistry appRegistry, VirtualHostConfiguration hostConfig) throws Exception
- {
- this(appRegistry, hostConfig, null);
- }
-
-
- public VirtualHostImpl(VirtualHostConfiguration hostConfig, MessageStore store) throws Exception
- {
- this(ApplicationRegistry.getInstance(),hostConfig,store);
- }
-
- private VirtualHostImpl(IApplicationRegistry appRegistry, VirtualHostConfiguration hostConfig, MessageStore store) throws Exception
+ public VirtualHostImpl(IApplicationRegistry appRegistry, VirtualHostConfiguration hostConfig, MessageStore store) throws Exception
{
if (hostConfig == null)
{
- throw new IllegalAccessException("HostConfig and MessageStore cannot be null");
+ throw new IllegalArgumentException("HostConfig cannot be null");
}
_appRegistry = appRegistry;
@@ -252,19 +239,24 @@ public class VirtualHostImpl implements VirtualHost
_brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean);
_brokerMBean.register();
- initialiseHouseKeeping(hostConfig.getHousekeepingExpiredMessageCheckPeriod());
+ initialiseHouseKeeping(hostConfig.getHousekeepingCheckPeriod());
initialiseStatistics();
}
+ /**
+ * Initialise a housekeeping task to iterate over queues cleaning expired messages with no consumers
+ * and checking for idle or open transactions that have exceeded the permitted thresholds.
+ *
+ * @param period
+ */
private void initialiseHouseKeeping(long period)
{
- /* add a timer task to iterate over queues, cleaning expired messages from queues with no consumers */
if (period != 0L)
{
- class ExpiredMessagesTask extends HouseKeepingTask
+ class VirtualHostHouseKeepingTask extends HouseKeepingTask
{
- public ExpiredMessagesTask(VirtualHost vhost)
+ public VirtualHostHouseKeepingTask(VirtualHost vhost)
{
super(vhost);
}
@@ -309,7 +301,7 @@ public class VirtualHostImpl implements VirtualHost
}
}
- scheduleHouseKeepingTask(period, new ExpiredMessagesTask(this));
+ scheduleHouseKeepingTask(period, new VirtualHostHouseKeepingTask(this));
Map<String, VirtualHostPluginFactory> plugins =
ApplicationRegistry.getInstance().getPluginManager().getVirtualHostPlugins();
@@ -463,46 +455,57 @@ public class VirtualHostImpl implements VirtualHost
private void configureQueue(QueueConfiguration queueConfiguration) throws AMQException, ConfigurationException
{
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueConfiguration, this);
+ String queueName = queue.getName();
if (queue.isDurable())
{
getDurableConfigurationStore().createQueue(queue);
}
+ //get the exchange name (returns default exchange name if none was specified)
String exchangeName = queueConfiguration.getExchange();
- Exchange exchange = _exchangeRegistry.getExchange(exchangeName == null ? null : new AMQShortString(exchangeName));
-
- if (exchange == null)
- {
- exchange = _exchangeRegistry.getDefaultExchange();
- }
-
+ Exchange exchange = _exchangeRegistry.getExchange(exchangeName);
if (exchange == null)
{
- throw new ConfigurationException("Attempt to bind queue to unknown exchange:" + exchangeName);
+ throw new ConfigurationException("Attempt to bind queue '" + queueName + "' to unknown exchange:" + exchangeName);
}
- List routingKeys = queueConfiguration.getRoutingKeys();
- if (routingKeys == null || routingKeys.isEmpty())
- {
- routingKeys = Collections.singletonList(queue.getNameShortString());
- }
+ Exchange defaultExchange = _exchangeRegistry.getDefaultExchange();
+
+ //get routing keys in configuration (returns empty list if none are defined)
+ List<?> routingKeys = queueConfiguration.getRoutingKeys();
for (Object routingKeyNameObj : routingKeys)
{
- AMQShortString routingKey = new AMQShortString(String.valueOf(routingKeyNameObj));
- if (_logger.isInfoEnabled())
+ String routingKey = String.valueOf(routingKeyNameObj);
+
+ if (exchange.equals(defaultExchange) && !queueName.equals(routingKey))
{
- _logger.info("Binding queue:" + queue + " with routing key '" + routingKey + "' to exchange:" + this);
+ throw new ConfigurationException("Illegal attempt to bind queue '" + queueName +
+ "' to the default exchange with a key other than the queue name: " + routingKey);
}
- _bindingFactory.addBinding(routingKey.toString(), queue, exchange, null);
+
+ configureBinding(queue, exchange, routingKey);
}
- if (exchange != _exchangeRegistry.getDefaultExchange())
+ if (!exchange.equals(defaultExchange))
+ {
+ //bind the queue to the named exchange using its name
+ configureBinding(queue, exchange, queueName);
+ }
+
+ //ensure the queue is bound to the default exchange using its name
+ configureBinding(queue, defaultExchange, queueName);
+ }
+
+ private void configureBinding(AMQQueue queue, Exchange exchange, String routingKey) throws AMQException
+ {
+ if (_logger.isInfoEnabled())
{
- _bindingFactory.addBinding(queue.getNameShortString().toString(), queue, exchange, null);
+ _logger.info("Binding queue:" + queue + " with routing key '" + routingKey + "' to exchange:" + exchange.getName());
}
+ _bindingFactory.addBinding(routingKey, queue, exchange, null);
}
public String getName()