summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java257
1 files changed, 92 insertions, 165 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
index 785d4610eb..fcad1550e1 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
@@ -59,7 +59,6 @@ import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.DefaultQueueRegistry;
import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.stats.StatisticsCounter;
@@ -72,7 +71,6 @@ import org.apache.qpid.server.virtualhost.plugins.VirtualHostPluginFactory;
import javax.management.NotCompliantMBeanException;
import javax.management.ObjectName;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -84,8 +82,26 @@ public class VirtualHostImpl implements VirtualHost
{
private static final Logger _logger = Logger.getLogger(VirtualHostImpl.class);
+ private static final int HOUSEKEEPING_SHUTDOWN_TIMEOUT = 5;
+
+ private final UUID _id;
+
private final String _name;
+ private final long _createTime = System.currentTimeMillis();
+
+ private final ConcurrentHashMap<BrokerLink,BrokerLink> _links = new ConcurrentHashMap<BrokerLink, BrokerLink>();
+
+ private final ScheduledThreadPoolExecutor _houseKeepingTasks;
+
+ private final IApplicationRegistry _appRegistry;
+
+ private final SecurityManager _securityManager;
+
+ private final BrokerConfig _brokerConfig;
+
+ private final VirtualHostConfiguration _configuration;
+
private ConnectionRegistry _connectionRegistry;
private QueueRegistry _queueRegistry;
@@ -102,92 +118,23 @@ public class VirtualHostImpl implements VirtualHost
private AMQBrokerManagerMBean _brokerMBean;
- private SecurityManager _securityManager;
- private final ScheduledThreadPoolExecutor _houseKeepingTasks;
- private final IApplicationRegistry _appRegistry;
- private VirtualHostConfiguration _configuration;
private DurableConfigurationStore _durableConfigurationStore;
private BindingFactory _bindingFactory;
- private BrokerConfig _broker;
- private UUID _id;
private boolean _statisticsEnabled = false;
private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
- private final long _createTime = System.currentTimeMillis();
- private final ConcurrentHashMap<BrokerLink,BrokerLink> _links = new ConcurrentHashMap<BrokerLink, BrokerLink>();
- private static final int HOUSEKEEPING_SHUTDOWN_TIMEOUT = 5;
-
-
- public IConnectionRegistry getConnectionRegistry()
- {
- return _connectionRegistry;
- }
-
- public VirtualHostConfiguration getConfiguration()
- {
- return _configuration;
- }
-
- public UUID getId()
- {
- return _id;
- }
-
- public VirtualHostConfigType getConfigType()
- {
- return VirtualHostConfigType.getInstance();
- }
-
- public ConfiguredObject getParent()
- {
- return getBroker();
- }
- public boolean isDurable()
- {
- return false;
- }
-
- /**
- * Virtual host JMX MBean class.
- *
- * This has some of the methods implemented from management intrerface for exchanges. Any
- * implementaion of an Exchange MBean should extend this class.
- */
- public class VirtualHostMBean extends AMQManagedObject implements ManagedVirtualHost
+ public VirtualHostImpl(IApplicationRegistry appRegistry, VirtualHostConfiguration hostConfig, MessageStore store) throws Exception
{
- public VirtualHostMBean() throws NotCompliantMBeanException
- {
- super(ManagedVirtualHost.class, ManagedVirtualHost.TYPE);
- }
-
- public String getObjectInstanceName()
- {
- return ObjectName.quote(_name);
- }
-
- public String getName()
+ if (hostConfig == null)
{
- return _name;
+ throw new IllegalArgumentException("HostConfig cannot be null");
}
- public VirtualHostImpl getVirtualHost()
- {
- return VirtualHostImpl.this;
- }
- }
-
- public VirtualHostImpl(IApplicationRegistry appRegistry, VirtualHostConfiguration hostConfig, MessageStore store) throws Exception
- {
- if (hostConfig == null)
- {
- throw new IllegalArgumentException("HostConfig cannot be null");
- }
-
_appRegistry = appRegistry;
- _broker = _appRegistry.getBroker();
+ _brokerConfig = _appRegistry.getBroker();
_configuration = hostConfig;
_name = _configuration.getName();
_dtxRegistry = new DtxRegistry();
@@ -198,7 +145,7 @@ public class VirtualHostImpl implements VirtualHost
if (_name == null || _name.length() == 0)
{
- throw new IllegalArgumentException("Illegal name (" + _name + ") for virtualhost.");
+ throw new IllegalArgumentException("Illegal name (" + _name + ") for virtualhost.");
}
_securityManager = new SecurityManager(_appRegistry.getSecurityManager());
@@ -238,17 +185,76 @@ public class VirtualHostImpl implements VirtualHost
}
else
{
- initialiseMessageStore(hostConfig);
+ initialiseMessageStore(hostConfig);
}
-
_brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean);
_brokerMBean.register();
initialiseHouseKeeping(hostConfig.getHousekeepingCheckPeriod());
-
+
initialiseStatistics();
}
+ public IConnectionRegistry getConnectionRegistry()
+ {
+ return _connectionRegistry;
+ }
+
+ public VirtualHostConfiguration getConfiguration()
+ {
+ return _configuration;
+ }
+
+ public UUID getId()
+ {
+ return _id;
+ }
+
+ public VirtualHostConfigType getConfigType()
+ {
+ return VirtualHostConfigType.getInstance();
+ }
+
+ public ConfiguredObject getParent()
+ {
+ return getBroker();
+ }
+
+ public boolean isDurable()
+ {
+ return false;
+ }
+
+ /**
+ * Virtual host JMX MBean class.
+ *
+ * This has some of the methods implemented from management intrerface for exchanges. Any
+ * implementaion of an Exchange MBean should extend this class.
+ */
+ public class VirtualHostMBean extends AMQManagedObject implements ManagedVirtualHost
+ {
+ public VirtualHostMBean() throws NotCompliantMBeanException
+ {
+ super(ManagedVirtualHost.class, ManagedVirtualHost.TYPE);
+ }
+
+ public String getObjectInstanceName()
+ {
+ return ObjectName.quote(_name);
+ }
+
+ public String getName()
+ {
+ return _name;
+ }
+
+ public VirtualHostImpl getVirtualHost()
+ {
+ return VirtualHostImpl.this;
+ }
+ }
+
+
/**
* Initialise a housekeeping task to iterate over queues cleaning expired messages with no consumers
* and checking for idle or open transactions that have exceeded the permitted thresholds.
@@ -263,8 +269,7 @@ public class VirtualHostImpl implements VirtualHost
scheduleHouseKeepingTask(period, new VirtualHostHouseKeepingTask());
- Map<String, VirtualHostPluginFactory> plugins =
- ApplicationRegistry.getInstance().getPluginManager().getVirtualHostPlugins();
+ Map<String, VirtualHostPluginFactory> plugins = _appRegistry.getPluginManager().getVirtualHostPlugins();
if (plugins != null)
{
@@ -389,11 +394,9 @@ public class VirtualHostImpl implements VirtualHost
{
String messageStoreClass = hostConfig.getMessageStoreClass();
- Class clazz = Class.forName(messageStoreClass);
+ Class<?> clazz = Class.forName(messageStoreClass);
Object o = clazz.newInstance();
-
-
if (!(o instanceof MessageStore))
{
throw new ClassCastException("Message store class must implement " + MessageStore.class + ". Class " + clazz +
@@ -435,11 +438,10 @@ public class VirtualHostImpl implements VirtualHost
{
_logger.debug("Loading configuration for virtualhost: " + config.getName());
- List exchangeNames = config.getExchanges();
+ List<String> exchangeNames = config.getExchanges();
- for (Object exchangeNameObj : exchangeNames)
+ for (String exchangeName : exchangeNames)
{
- String exchangeName = String.valueOf(exchangeNameObj);
configureExchange(config.getExchangeConfiguration(exchangeName));
}
@@ -538,17 +540,12 @@ public class VirtualHostImpl implements VirtualHost
public BrokerConfig getBroker()
{
- return _broker;
+ return _brokerConfig;
}
public String getFederationTag()
{
- return _broker.getFederationTag();
- }
-
- public void setBroker(final BrokerConfig broker)
- {
- _broker = broker;
+ return _brokerConfig.getFederationTag();
}
public long getCreateTime()
@@ -634,7 +631,7 @@ public class VirtualHostImpl implements VirtualHost
}
catch (Exception e)
{
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ _logger.error("Failed to close message store", e);
}
}
@@ -805,39 +802,15 @@ public class VirtualHostImpl implements VirtualHost
*/
private static class StartupRoutingTable implements DurableConfigurationStore
{
- private List<Exchange> exchange = new LinkedList<Exchange>();
- private List<CreateQueueTuple> queue = new LinkedList<CreateQueueTuple>();
- private List<CreateBindingTuple> bindings = new LinkedList<CreateBindingTuple>();
- private List<BrokerLink> links = new LinkedList<BrokerLink>();
- private List<Bridge> bridges = new LinkedList<Bridge>();
-
- public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
- {
- }
-
- public void close() throws Exception
- {
- }
-
- public void removeMessage(Long messageId) throws AMQException
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
public void configureConfigStore(String name,
ConfigurationRecoveryHandler recoveryHandler,
Configuration config,
LogSubject logSubject) throws Exception
{
- //To change body of implemented methods use File | Settings | File Templates.
}
public void createExchange(Exchange exchange) throws AMQStoreException
{
- if (exchange.isDurable())
- {
- this.exchange.add(exchange);
- }
}
public void removeExchange(Exchange exchange) throws AMQStoreException
@@ -846,10 +819,6 @@ public class VirtualHostImpl implements VirtualHost
public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException
{
- if (exchange.isDurable() && queue.isDurable())
- {
- bindings.add(new CreateBindingTuple(exchange, routingKey, queue, args));
- }
}
public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException
@@ -858,60 +827,22 @@ public class VirtualHostImpl implements VirtualHost
public void createQueue(AMQQueue queue) throws AMQStoreException
{
- createQueue(queue, null);
}
public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException
{
- if (queue.isDurable())
- {
- this.queue.add(new CreateQueueTuple(queue, arguments));
- }
}
public void removeQueue(AMQQueue queue) throws AMQStoreException
{
}
-
- private static class CreateQueueTuple
- {
- private AMQQueue queue;
- private FieldTable arguments;
-
- public CreateQueueTuple(AMQQueue queue, FieldTable arguments)
- {
- this.queue = queue;
- this.arguments = arguments;
- }
- }
-
- private static class CreateBindingTuple
- {
- private AMQQueue queue;
- private FieldTable arguments;
- private Exchange exchange;
- private AMQShortString routingKey;
-
- public CreateBindingTuple(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args)
- {
- this.exchange = exchange;
- this.routingKey = routingKey;
- this.queue = queue;
- arguments = args;
- }
- }
-
public void updateQueue(AMQQueue queue) throws AMQStoreException
{
}
public void createBrokerLink(final BrokerLink link) throws AMQStoreException
{
- if(link.isDurable())
- {
- links.add(link);
- }
}
public void deleteBrokerLink(final BrokerLink link) throws AMQStoreException
@@ -920,10 +851,6 @@ public class VirtualHostImpl implements VirtualHost
public void createBridge(final Bridge bridge) throws AMQStoreException
{
- if(bridge.isDurable())
- {
- bridges.add(bridge);
- }
}
public void deleteBridge(final Bridge bridge) throws AMQStoreException