diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java | 172 |
1 files changed, 93 insertions, 79 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java index a4a3633af7..9a0606f47a 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java @@ -20,21 +20,11 @@ */ package org.apache.qpid.server.virtualhost; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -import javax.management.NotCompliantMBeanException; -import javax.management.ObjectName; - +import java.util.concurrent.ScheduledFuture; import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; import org.apache.log4j.Logger; + import org.apache.qpid.AMQException; import org.apache.qpid.AMQStoreException; import org.apache.qpid.framing.AMQShortString; @@ -73,14 +63,25 @@ import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.security.SecurityManager; -import org.apache.qpid.server.security.auth.manager.AuthenticationManager; import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.store.ConfigurationRecoveryHandler; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.txn.DtxRegistry; import org.apache.qpid.server.virtualhost.plugins.VirtualHostPlugin; import org.apache.qpid.server.virtualhost.plugins.VirtualHostPluginFactory; +import javax.management.NotCompliantMBeanException; +import javax.management.ObjectName; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + public class VirtualHostImpl implements VirtualHost { private static final Logger _logger = Logger.getLogger(VirtualHostImpl.class); @@ -97,11 +98,11 @@ public class VirtualHostImpl implements VirtualHost private MessageStore _messageStore; - protected VirtualHostMBean _virtualHostMBean; + private DtxRegistry _dtxRegistry; - private AMQBrokerManagerMBean _brokerMBean; + private VirtualHostMBean _virtualHostMBean; - private final AuthenticationManager _authenticationManager; + private AMQBrokerManagerMBean _brokerMBean; private SecurityManager _securityManager; @@ -121,6 +122,7 @@ public class VirtualHostImpl implements VirtualHost private static final int HOUSEKEEPING_SHUTDOWN_TIMEOUT = 5; private final Map<String, LinkRegistry> _linkRegistry = new HashMap<String, LinkRegistry>(); + public IConnectionRegistry getConnectionRegistry() { return _connectionRegistry; @@ -191,6 +193,7 @@ public class VirtualHostImpl implements VirtualHost _broker = _appRegistry.getBroker(); _configuration = hostConfig; _name = _configuration.getName(); + _dtxRegistry = new DtxRegistry(); _id = _appRegistry.getConfigStore().createId(); @@ -241,7 +244,6 @@ public class VirtualHostImpl implements VirtualHost initialiseMessageStore(hostConfig); } - _authenticationManager = ApplicationRegistry.getInstance().getAuthenticationManager(); _brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean); _brokerMBean.register(); @@ -260,54 +262,9 @@ public class VirtualHostImpl implements VirtualHost { if (period != 0L) { - class VirtualHostHouseKeepingTask extends HouseKeepingTask - { - public VirtualHostHouseKeepingTask(VirtualHost vhost) - { - super(vhost); - } - public void execute() - { - for (AMQQueue q : _queueRegistry.getQueues()) - { - _logger.debug("Checking message status for queue: " - + q.getName()); - try - { - q.checkMessageStatus(); - } - catch (Exception e) - { - _logger.error("Exception in housekeeping for queue: " - + q.getNameShortString().toString(), e); - //Don't throw exceptions as this will stop the - // house keeping task from running. - } - } - for (AMQConnectionModel connection : getConnectionRegistry().getConnections()) - { - _logger.debug("Checking for long running open transactions on connection " + connection); - for (AMQSessionModel session : connection.getSessionModels()) - { - _logger.debug("Checking for long running open transactions on session " + session); - try - { - session.checkTransactionStatus(_configuration.getTransactionTimeoutOpenWarn(), - _configuration.getTransactionTimeoutOpenClose(), - _configuration.getTransactionTimeoutIdleWarn(), - _configuration.getTransactionTimeoutIdleClose()); - } - catch (Exception e) - { - _logger.error("Exception in housekeeping for connection: " + connection.toString(), e); - } - } - } - } - } - scheduleHouseKeepingTask(period, new VirtualHostHouseKeepingTask(this)); + scheduleHouseKeepingTask(period, new VirtualHostHouseKeepingTask()); Map<String, VirtualHostPluginFactory> plugins = ApplicationRegistry.getInstance().getPluginManager().getVirtualHostPlugins(); @@ -340,6 +297,53 @@ public class VirtualHostImpl implements VirtualHost } } + private class VirtualHostHouseKeepingTask extends HouseKeepingTask + { + public VirtualHostHouseKeepingTask() + { + super(VirtualHostImpl.this); + } + + public void execute() + { + for (AMQQueue q : _queueRegistry.getQueues()) + { + _logger.debug("Checking message status for queue: " + + q.getName()); + try + { + q.checkMessageStatus(); + } + catch (Exception e) + { + _logger.error("Exception in housekeeping for queue: " + + q.getNameShortString().toString(), e); + //Don't throw exceptions as this will stop the + // house keeping task from running. + } + } + for (AMQConnectionModel connection : getConnectionRegistry().getConnections()) + { + _logger.debug("Checking for long running open transactions on connection " + connection); + for (AMQSessionModel session : connection.getSessionModels()) + { + _logger.debug("Checking for long running open transactions on session " + session); + try + { + session.checkTransactionStatus(_configuration.getTransactionTimeoutOpenWarn(), + _configuration.getTransactionTimeoutOpenClose(), + _configuration.getTransactionTimeoutIdleWarn(), + _configuration.getTransactionTimeoutIdleClose()); + } + catch (Exception e) + { + _logger.error("Exception in housekeeping for connection: " + connection.toString(), e); + } + } + } + } + } + /** * Allow other broker components to register a HouseKeepingTask * @@ -352,6 +356,11 @@ public class VirtualHostImpl implements VirtualHost TimeUnit.MILLISECONDS); } + public ScheduledFuture<?> scheduleTask(long delay, Runnable task) + { + return _houseKeepingTasks.schedule(task, delay, TimeUnit.MILLISECONDS); + } + public long getHouseKeepingTaskCount() { return _houseKeepingTasks.getTaskCount(); @@ -575,11 +584,6 @@ public class VirtualHostImpl implements VirtualHost return _durableConfigurationStore; } - public AuthenticationManager getAuthenticationManager() - { - return _authenticationManager; - } - public SecurityManager getSecurityManager() { return _securityManager; @@ -618,6 +622,11 @@ public class VirtualHostImpl implements VirtualHost } } + if(_dtxRegistry != null) + { + _dtxRegistry.close(); + } + //Close MessageStore if (_messageStore != null) { @@ -796,6 +805,11 @@ public class VirtualHostImpl implements VirtualHost return getApplicationRegistry().getConfigStore(); } + public DtxRegistry getDtxRegistry() + { + return _dtxRegistry; + } + /** * Temporary Startup RT class to record the creation of persistent queues / exchanges. * @@ -805,11 +819,11 @@ public class VirtualHostImpl implements VirtualHost */ private static class StartupRoutingTable implements DurableConfigurationStore { - public List<Exchange> exchange = new LinkedList<Exchange>(); - public List<CreateQueueTuple> queue = new LinkedList<CreateQueueTuple>(); - public List<CreateBindingTuple> bindings = new LinkedList<CreateBindingTuple>(); - public List<BrokerLink> links = new LinkedList<BrokerLink>(); - public List<Bridge> bridges = new LinkedList<Bridge>(); + private List<Exchange> exchange = new LinkedList<Exchange>(); + private List<CreateQueueTuple> queue = new LinkedList<CreateQueueTuple>(); + private List<CreateBindingTuple> bindings = new LinkedList<CreateBindingTuple>(); + private List<BrokerLink> links = new LinkedList<BrokerLink>(); + private List<Bridge> bridges = new LinkedList<Bridge>(); public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception { @@ -876,8 +890,8 @@ public class VirtualHostImpl implements VirtualHost private static class CreateQueueTuple { - public AMQQueue queue; - public FieldTable arguments; + private AMQQueue queue; + private FieldTable arguments; public CreateQueueTuple(AMQQueue queue, FieldTable arguments) { @@ -888,10 +902,10 @@ public class VirtualHostImpl implements VirtualHost private static class CreateBindingTuple { - public AMQQueue queue; - public FieldTable arguments; - public Exchange exchange; - public AMQShortString routingKey; + private AMQQueue queue; + private FieldTable arguments; + private Exchange exchange; + private AMQShortString routingKey; public CreateBindingTuple(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) { |