diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost')
12 files changed, 338 insertions, 160 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java index ebace95f65..523bafb8e1 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java @@ -21,13 +21,14 @@ package org.apache.qpid.server.virtualhost; import org.apache.log4j.Logger; + import org.apache.qpid.server.logging.RootMessageLogger; import org.apache.qpid.server.logging.actors.AbstractActor; import org.apache.qpid.server.logging.actors.CurrentActor; public abstract class HouseKeepingTask implements Runnable { - Logger _logger = Logger.getLogger(this.getClass()); + private Logger _logger = Logger.getLogger(this.getClass()); private VirtualHost _virtualHost; @@ -59,7 +60,7 @@ public abstract class HouseKeepingTask implements Runnable { execute(); } - catch (Throwable e) + catch (Exception e) { _logger.warn(this.getClass().getSimpleName() + " throw exception: " + e, e); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ManagedVirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ManagedVirtualHost.java index 767474d5ae..cb7f213f06 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ManagedVirtualHost.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ManagedVirtualHost.java @@ -20,10 +20,10 @@ */
package org.apache.qpid.server.virtualhost;
-import java.io.IOException;
-
import org.apache.qpid.management.common.mbeans.annotations.MBeanAttribute;
+import java.io.IOException;
+
/**
* The management interface exposed to allow management of a virtualHost
*/
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java index 41a5471a64..4b586db628 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java @@ -22,7 +22,7 @@ package org.apache.qpid.server.virtualhost; import java.util.Map; import java.util.UUID; - +import java.util.concurrent.ScheduledFuture; import org.apache.qpid.common.Closeable; import org.apache.qpid.server.binding.BindingFactory; import org.apache.qpid.server.configuration.ConfigStore; @@ -37,10 +37,10 @@ import org.apache.qpid.server.protocol.v1_0.LinkRegistry; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.security.SecurityManager; -import org.apache.qpid.server.security.auth.manager.AuthenticationManager; import org.apache.qpid.server.stats.StatisticsGatherer; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.txn.DtxRegistry; public interface VirtualHost extends DurableConfigurationStore.Source, VirtualHostConfig, Closeable, StatisticsGatherer { @@ -60,8 +60,6 @@ public interface VirtualHost extends DurableConfigurationStore.Source, VirtualHo DurableConfigurationStore getDurableConfigurationStore(); - AuthenticationManager getAuthenticationManager(); - SecurityManager getSecurityManager(); void close(); @@ -97,7 +95,11 @@ public interface VirtualHost extends DurableConfigurationStore.Source, VirtualHo ConfigStore getConfigStore(); + DtxRegistry getDtxRegistry(); + void removeBrokerConnection(BrokerLink brokerLink); LinkRegistry getLinkRegistry(String remoteContainerId); + + ScheduledFuture<?> scheduleTask(long delay, Runnable timeoutTask); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java index 51892d965a..0e965472d5 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java @@ -20,43 +20,47 @@ */ package org.apache.qpid.server.virtualhost; +import java.io.DataInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.UUID; +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.AMQStoreException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.binding.BindingFactory; +import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.federation.BrokerLink; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.messages.TransactionLogMessages; +import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; +import org.apache.qpid.server.message.AMQMessage; +import org.apache.qpid.server.message.AbstractServerMessageImpl; import org.apache.qpid.server.message.EnqueableMessage; +import org.apache.qpid.server.message.MessageTransferMessage; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.AMQQueueFactory; +import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.store.ConfigurationRecoveryHandler; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.MessageStoreRecoveryHandler; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TransactionLogRecoveryHandler; import org.apache.qpid.server.store.TransactionLogResource; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.AMQQueueFactory; -import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.messages.TransactionLogMessages; -import org.apache.qpid.server.message.AMQMessage; -import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.message.MessageTransferMessage; -import org.apache.qpid.server.binding.BindingFactory; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.AMQException; - -import org.apache.log4j.Logger; +import org.apache.qpid.server.txn.DtxBranch; +import org.apache.qpid.server.txn.DtxRegistry; +import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.transport.Xid; +import org.apache.qpid.transport.util.Functions; import org.apache.qpid.util.ByteBufferInputStream; -import java.io.DataInputStream; -import java.io.IOException; -import java.nio.ByteBuffer; - -import java.util.List; -import java.util.ArrayList; -import java.util.Map; -import java.util.HashMap; -import java.util.TreeMap; -import java.util.UUID; - public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHandler, ConfigurationRecoveryHandler.QueueRecoveryHandler, ConfigurationRecoveryHandler.ExchangeRecoveryHandler, @@ -65,7 +69,8 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa MessageStoreRecoveryHandler, MessageStoreRecoveryHandler.StoredMessageRecoveryHandler, TransactionLogRecoveryHandler, - TransactionLogRecoveryHandler.QueueEntryRecoveryHandler + TransactionLogRecoveryHandler.QueueEntryRecoveryHandler, + TransactionLogRecoveryHandler.DtxRecordRecoveryHandler { private static final Logger _logger = Logger.getLogger(VirtualHostConfigRecoveryHandler.class); @@ -78,7 +83,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa private MessageStore _store; private final Map<String, Integer> _queueRecoveries = new TreeMap<String, Integer>(); - private Map<Long, ServerMessage> _recoveredMessages = new HashMap<Long, ServerMessage>(); + private Map<Long, AbstractServerMessageImpl> _recoveredMessages = new HashMap<Long, AbstractServerMessageImpl>(); private Map<Long, StoredMessage> _unusedMessages = new HashMap<Long, StoredMessage>(); @@ -160,7 +165,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa public void message(StoredMessage message) { - ServerMessage serverMessage; + AbstractServerMessageImpl serverMessage; switch(message.getMetaData().getType()) { case META_DATA_0_8: @@ -173,9 +178,6 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa throw new RuntimeException("Unknown message type retrieved from store " + message.getMetaData().getClass()); } - //_logger.debug("Recovered message with id " + serverMessage); - - _recoveredMessages.put(message.getMessageNumber(), serverMessage); _unusedMessages.put(message.getMessageNumber(), message); } @@ -198,6 +200,164 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa { } + public void dtxRecord(long format, byte[] globalId, byte[] branchId, + MessageStore.Transaction.Record[] enqueues, + MessageStore.Transaction.Record[] dequeues) + { + Xid id = new Xid(format, globalId, branchId); + DtxRegistry dtxRegistry = _virtualHost.getDtxRegistry(); + DtxBranch branch = dtxRegistry.getBranch(id); + if(branch == null) + { + branch = new DtxBranch(id, _store, _virtualHost); + dtxRegistry.registerBranch(branch); + } + for(MessageStore.Transaction.Record record : enqueues) + { + final AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(record.getQueue().getResourceName()); + if(queue != null) + { + final long messageId = record.getMessage().getMessageNumber(); + final AbstractServerMessageImpl message = _recoveredMessages.get(messageId); + _unusedMessages.remove(messageId); + + if(message != null) + { + message.incrementReference(); + + branch.enqueue(queue,message); + + branch.addPostTransactionAcion(new ServerTransaction.Action() + { + + public void postCommit() + { + try + { + + queue.enqueue(message, true, null); + message.decrementReference(); + } + catch (AMQException e) + { + _logger.error("Unable to enqueue message " + message.getMessageNumber() + " into " + + "queue " + queue.getName() + " (from XA transaction)", e); + throw new RuntimeException(e); + } + } + + public void onRollback() + { + message.decrementReference(); + } + }); + } + else + { + StringBuilder xidString = xidAsString(id); + String messageNumberString = String.valueOf(message.getMessageNumber()); + CurrentActor.get().message(_logSubject, + TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidString.toString(), + messageNumberString)); + + } + + } + else + { + StringBuilder xidString = xidAsString(id); + CurrentActor.get().message(_logSubject, + TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(), + record.getQueue().getResourceName())); + + } + } + for(MessageStore.Transaction.Record record : dequeues) + { + final AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(record.getQueue().getResourceName()); + if(queue != null) + { + final long messageId = record.getMessage().getMessageNumber(); + final AbstractServerMessageImpl message = _recoveredMessages.get(messageId); + _unusedMessages.remove(messageId); + + if(message != null) + { + final QueueEntry entry = queue.getMessageOnTheQueue(messageId); + + entry.acquire(); + + branch.dequeue(queue, message); + + branch.addPostTransactionAcion(new ServerTransaction.Action() + { + + public void postCommit() + { + entry.discard(); + } + + public void onRollback() + { + entry.release(); + } + }); + } + else + { + StringBuilder xidString = xidAsString(id); + String messageNumberString = String.valueOf(message.getMessageNumber()); + CurrentActor.get().message(_logSubject, + TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidString.toString(), + messageNumberString)); + + } + + } + else + { + StringBuilder xidString = xidAsString(id); + CurrentActor.get().message(_logSubject, + TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(), + queue.getName())); + } + + } + + try + { + branch.setState(DtxBranch.State.PREPARED); + branch.prePrepareTransaction(); + } + catch (AMQStoreException e) + { + _logger.error("Unexpected database exception when attempting to prepare a recovered XA transaction " + + xidAsString(id), e); + throw new RuntimeException(e); + } + } + + private static StringBuilder xidAsString(Xid id) + { + return new StringBuilder("(") + .append(id.getFormat()) + .append(',') + .append(Functions.str(id.getGlobalId())) + .append(',') + .append(Functions.str(id.getBranchId())) + .append(')'); + } + + public void completeDtxRecordRecovery() + { + for(StoredMessage m : _unusedMessages.values()) + { + _logger.warn("Message id " + m.getMessageNumber() + " in store, but not in any queue - removing...."); + m.remove(); + } + CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(null, false)); + } + private static final class ProcessAction { private final AMQQueue _queue; @@ -354,15 +514,9 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa } - public void completeQueueEntryRecovery() + public DtxRecordRecoveryHandler completeQueueEntryRecovery() { - for(StoredMessage m : _unusedMessages.values()) - { - _logger.warn("Message id " + m.getMessageNumber() + " in store, but not in any queue - removing...."); - m.remove(); - } - for(Map.Entry<String,Integer> entry : _queueRecoveries.entrySet()) { CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERED(entry.getValue(), entry.getKey())); @@ -370,7 +524,9 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(entry.getKey(), true)); } - CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(null, false)); + + + return this; } private static class DummyMessage implements EnqueableMessage diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java index a4a3633af7..9a0606f47a 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java @@ -20,21 +20,11 @@ */ package org.apache.qpid.server.virtualhost; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -import javax.management.NotCompliantMBeanException; -import javax.management.ObjectName; - +import java.util.concurrent.ScheduledFuture; import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; import org.apache.log4j.Logger; + import org.apache.qpid.AMQException; import org.apache.qpid.AMQStoreException; import org.apache.qpid.framing.AMQShortString; @@ -73,14 +63,25 @@ import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.security.SecurityManager; -import org.apache.qpid.server.security.auth.manager.AuthenticationManager; import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.store.ConfigurationRecoveryHandler; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.txn.DtxRegistry; import org.apache.qpid.server.virtualhost.plugins.VirtualHostPlugin; import org.apache.qpid.server.virtualhost.plugins.VirtualHostPluginFactory; +import javax.management.NotCompliantMBeanException; +import javax.management.ObjectName; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + public class VirtualHostImpl implements VirtualHost { private static final Logger _logger = Logger.getLogger(VirtualHostImpl.class); @@ -97,11 +98,11 @@ public class VirtualHostImpl implements VirtualHost private MessageStore _messageStore; - protected VirtualHostMBean _virtualHostMBean; + private DtxRegistry _dtxRegistry; - private AMQBrokerManagerMBean _brokerMBean; + private VirtualHostMBean _virtualHostMBean; - private final AuthenticationManager _authenticationManager; + private AMQBrokerManagerMBean _brokerMBean; private SecurityManager _securityManager; @@ -121,6 +122,7 @@ public class VirtualHostImpl implements VirtualHost private static final int HOUSEKEEPING_SHUTDOWN_TIMEOUT = 5; private final Map<String, LinkRegistry> _linkRegistry = new HashMap<String, LinkRegistry>(); + public IConnectionRegistry getConnectionRegistry() { return _connectionRegistry; @@ -191,6 +193,7 @@ public class VirtualHostImpl implements VirtualHost _broker = _appRegistry.getBroker(); _configuration = hostConfig; _name = _configuration.getName(); + _dtxRegistry = new DtxRegistry(); _id = _appRegistry.getConfigStore().createId(); @@ -241,7 +244,6 @@ public class VirtualHostImpl implements VirtualHost initialiseMessageStore(hostConfig); } - _authenticationManager = ApplicationRegistry.getInstance().getAuthenticationManager(); _brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean); _brokerMBean.register(); @@ -260,54 +262,9 @@ public class VirtualHostImpl implements VirtualHost { if (period != 0L) { - class VirtualHostHouseKeepingTask extends HouseKeepingTask - { - public VirtualHostHouseKeepingTask(VirtualHost vhost) - { - super(vhost); - } - public void execute() - { - for (AMQQueue q : _queueRegistry.getQueues()) - { - _logger.debug("Checking message status for queue: " - + q.getName()); - try - { - q.checkMessageStatus(); - } - catch (Exception e) - { - _logger.error("Exception in housekeeping for queue: " - + q.getNameShortString().toString(), e); - //Don't throw exceptions as this will stop the - // house keeping task from running. - } - } - for (AMQConnectionModel connection : getConnectionRegistry().getConnections()) - { - _logger.debug("Checking for long running open transactions on connection " + connection); - for (AMQSessionModel session : connection.getSessionModels()) - { - _logger.debug("Checking for long running open transactions on session " + session); - try - { - session.checkTransactionStatus(_configuration.getTransactionTimeoutOpenWarn(), - _configuration.getTransactionTimeoutOpenClose(), - _configuration.getTransactionTimeoutIdleWarn(), - _configuration.getTransactionTimeoutIdleClose()); - } - catch (Exception e) - { - _logger.error("Exception in housekeeping for connection: " + connection.toString(), e); - } - } - } - } - } - scheduleHouseKeepingTask(period, new VirtualHostHouseKeepingTask(this)); + scheduleHouseKeepingTask(period, new VirtualHostHouseKeepingTask()); Map<String, VirtualHostPluginFactory> plugins = ApplicationRegistry.getInstance().getPluginManager().getVirtualHostPlugins(); @@ -340,6 +297,53 @@ public class VirtualHostImpl implements VirtualHost } } + private class VirtualHostHouseKeepingTask extends HouseKeepingTask + { + public VirtualHostHouseKeepingTask() + { + super(VirtualHostImpl.this); + } + + public void execute() + { + for (AMQQueue q : _queueRegistry.getQueues()) + { + _logger.debug("Checking message status for queue: " + + q.getName()); + try + { + q.checkMessageStatus(); + } + catch (Exception e) + { + _logger.error("Exception in housekeeping for queue: " + + q.getNameShortString().toString(), e); + //Don't throw exceptions as this will stop the + // house keeping task from running. + } + } + for (AMQConnectionModel connection : getConnectionRegistry().getConnections()) + { + _logger.debug("Checking for long running open transactions on connection " + connection); + for (AMQSessionModel session : connection.getSessionModels()) + { + _logger.debug("Checking for long running open transactions on session " + session); + try + { + session.checkTransactionStatus(_configuration.getTransactionTimeoutOpenWarn(), + _configuration.getTransactionTimeoutOpenClose(), + _configuration.getTransactionTimeoutIdleWarn(), + _configuration.getTransactionTimeoutIdleClose()); + } + catch (Exception e) + { + _logger.error("Exception in housekeeping for connection: " + connection.toString(), e); + } + } + } + } + } + /** * Allow other broker components to register a HouseKeepingTask * @@ -352,6 +356,11 @@ public class VirtualHostImpl implements VirtualHost TimeUnit.MILLISECONDS); } + public ScheduledFuture<?> scheduleTask(long delay, Runnable task) + { + return _houseKeepingTasks.schedule(task, delay, TimeUnit.MILLISECONDS); + } + public long getHouseKeepingTaskCount() { return _houseKeepingTasks.getTaskCount(); @@ -575,11 +584,6 @@ public class VirtualHostImpl implements VirtualHost return _durableConfigurationStore; } - public AuthenticationManager getAuthenticationManager() - { - return _authenticationManager; - } - public SecurityManager getSecurityManager() { return _securityManager; @@ -618,6 +622,11 @@ public class VirtualHostImpl implements VirtualHost } } + if(_dtxRegistry != null) + { + _dtxRegistry.close(); + } + //Close MessageStore if (_messageStore != null) { @@ -796,6 +805,11 @@ public class VirtualHostImpl implements VirtualHost return getApplicationRegistry().getConfigStore(); } + public DtxRegistry getDtxRegistry() + { + return _dtxRegistry; + } + /** * Temporary Startup RT class to record the creation of persistent queues / exchanges. * @@ -805,11 +819,11 @@ public class VirtualHostImpl implements VirtualHost */ private static class StartupRoutingTable implements DurableConfigurationStore { - public List<Exchange> exchange = new LinkedList<Exchange>(); - public List<CreateQueueTuple> queue = new LinkedList<CreateQueueTuple>(); - public List<CreateBindingTuple> bindings = new LinkedList<CreateBindingTuple>(); - public List<BrokerLink> links = new LinkedList<BrokerLink>(); - public List<Bridge> bridges = new LinkedList<Bridge>(); + private List<Exchange> exchange = new LinkedList<Exchange>(); + private List<CreateQueueTuple> queue = new LinkedList<CreateQueueTuple>(); + private List<CreateBindingTuple> bindings = new LinkedList<CreateBindingTuple>(); + private List<BrokerLink> links = new LinkedList<BrokerLink>(); + private List<Bridge> bridges = new LinkedList<Bridge>(); public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception { @@ -876,8 +890,8 @@ public class VirtualHostImpl implements VirtualHost private static class CreateQueueTuple { - public AMQQueue queue; - public FieldTable arguments; + private AMQQueue queue; + private FieldTable arguments; public CreateQueueTuple(AMQQueue queue, FieldTable arguments) { @@ -888,10 +902,10 @@ public class VirtualHostImpl implements VirtualHost private static class CreateBindingTuple { - public AMQQueue queue; - public FieldTable arguments; - public Exchange exchange; - public AMQShortString routingKey; + private AMQQueue queue; + private FieldTable arguments; + private Exchange exchange; + private AMQShortString routingKey; public CreateBindingTuple(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java index 32d0c4c4d1..ef621a166a 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java @@ -21,8 +21,8 @@ package org.apache.qpid.server.virtualhost;
import org.apache.qpid.common.Closeable;
-import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.configuration.ConfigStore;
+import org.apache.qpid.server.registry.ApplicationRegistry;
import java.util.ArrayList;
import java.util.Collection;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/ConfiguredQueueBindingListener.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/ConfiguredQueueBindingListener.java index 12206013eb..12886f400a 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/ConfiguredQueueBindingListener.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/ConfiguredQueueBindingListener.java @@ -20,19 +20,18 @@ */ package org.apache.qpid.server.virtualhost.plugins; -import java.util.Collections; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Set; - import org.apache.log4j.Logger; + import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.configuration.plugins.SlowConsumerDetectionQueueConfiguration; -import org.apache.qpid.server.exchange.AbstractExchange; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.Exchange.BindingListener; import org.apache.qpid.server.queue.AMQQueue; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + /** * This is a listener that caches queues that are configured for slow consumer disconnection. * @@ -93,7 +92,7 @@ public class ConfiguredQueueBindingListener implements BindingListener /** * Lookup and return the cache of configured {@link AMQQueue}s. * - * Note that when accessing the cached queues, the {@link Iterator} is not thread safe + * Note that when accessing the cached queues, the {@link java.util.Iterator} is not thread safe * (see the {@link Collections#synchronizedSet(Set)} documentation) so a copy of the * cache is returned. * diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/SlowConsumerDetection.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/SlowConsumerDetection.java index 248b3b2143..2c6705bb3b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/SlowConsumerDetection.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/SlowConsumerDetection.java @@ -19,21 +19,20 @@ */ package org.apache.qpid.server.virtualhost.plugins; -import java.util.Set; -import java.util.concurrent.TimeUnit; - import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; import org.apache.qpid.server.configuration.plugins.SlowConsumerDetectionConfiguration; import org.apache.qpid.server.configuration.plugins.SlowConsumerDetectionQueueConfiguration; -import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.plugins.Plugin; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.plugins.logging.SlowConsumerDetectionMessages; import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPlugin; +import java.util.Set; +import java.util.concurrent.TimeUnit; + public class SlowConsumerDetection extends VirtualHostHouseKeepingPlugin { private SlowConsumerDetectionConfiguration _config; @@ -61,7 +60,7 @@ public class SlowConsumerDetection extends VirtualHostHouseKeepingPlugin * virtual host to record all the configured queues in a cache for processing by the housekeeping * thread. * - * @see Plugin#configure(ConfigurationPlugin) + * @see org.apache.qpid.server.plugins.Plugin#configure(ConfigurationPlugin) */ public void configure(ConfigurationPlugin config) { @@ -98,7 +97,7 @@ public class SlowConsumerDetection extends VirtualHostHouseKeepingPlugin if (policy == null) { // We would only expect to see this during shutdown - _logger.warn("No slow consumer policy for queue " + q.getName()); + getLogger().warn("No slow consumer policy for queue " + q.getName()); } else { @@ -110,7 +109,7 @@ public class SlowConsumerDetection extends VirtualHostHouseKeepingPlugin catch (Exception e) { // Don't throw exceptions as this will stop the house keeping task from running. - _logger.error("Exception in SlowConsumersDetection for queue: " + q.getName(), e); + getLogger().error("Exception in SlowConsumersDetection for queue: " + q.getName(), e); } } @@ -139,9 +138,9 @@ public class SlowConsumerDetection extends VirtualHostHouseKeepingPlugin { if (config != null) { - if (_logger.isInfoEnabled()) + if (getLogger().isInfoEnabled()) { - _logger.info("Retrieved Queue(" + q.getName() + ") Config:" + config); + getLogger().info("Retrieved Queue(" + q.getName() + ") Config:" + config); } int count = q.getMessageCount(); @@ -157,12 +156,12 @@ public class SlowConsumerDetection extends VirtualHostHouseKeepingPlugin ((count > 0) && q.getOldestMessageArrivalTime() >= config.getMessageAge()))) { - if (_logger.isDebugEnabled()) + if (getLogger().isDebugEnabled()) { - _logger.debug("Detected Slow Consumer on Queue(" + q.getName() + ")"); - _logger.debug("Queue Count:" + q.getMessageCount() + ":" + config.getMessageCount()); - _logger.debug("Queue Depth:" + q.getQueueDepth() + ":" + config.getDepth()); - _logger.debug("Queue Arrival:" + q.getOldestMessageArrivalTime() + ":" + config.getMessageAge()); + getLogger().debug("Detected Slow Consumer on Queue(" + q.getName() + ")"); + getLogger().debug("Queue Count:" + q.getMessageCount() + ":" + config.getMessageCount()); + getLogger().debug("Queue Depth:" + q.getQueueDepth() + ":" + config.getDepth()); + getLogger().debug("Queue Arrival:" + q.getOldestMessageArrivalTime() + ":" + config.getMessageAge()); } return true; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/VirtualHostHouseKeepingPlugin.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/VirtualHostHouseKeepingPlugin.java index 3798f47f0b..191f8041d2 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/VirtualHostHouseKeepingPlugin.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/VirtualHostHouseKeepingPlugin.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.virtualhost.plugins; import org.apache.log4j.Logger; + import org.apache.qpid.server.virtualhost.HouseKeepingTask; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -28,7 +29,7 @@ import java.util.concurrent.TimeUnit; public abstract class VirtualHostHouseKeepingPlugin extends HouseKeepingTask implements VirtualHostPlugin { - protected final Logger _logger = Logger.getLogger(getClass()); + private final Logger _logger = Logger.getLogger(getClass()); public VirtualHostHouseKeepingPlugin(VirtualHost vhost) { @@ -51,4 +52,10 @@ public abstract class VirtualHostHouseKeepingPlugin extends HouseKeepingTask imp * @see java.util.concurrent.TimeUnit for valid value. */ public abstract TimeUnit getTimeUnit(); + + + protected Logger getLogger() + { + return _logger; + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/VirtualHostPlugin.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/VirtualHostPlugin.java index 1886c2d01d..35f6228ab9 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/VirtualHostPlugin.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/VirtualHostPlugin.java @@ -20,10 +20,9 @@ */ package org.apache.qpid.server.virtualhost.plugins; -import java.util.concurrent.TimeUnit; - import org.apache.qpid.server.plugins.Plugin; -import org.apache.qpid.server.virtualhost.VirtualHost; + +import java.util.concurrent.TimeUnit; public interface VirtualHostPlugin extends Runnable, Plugin { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/policies/TopicDeletePolicy.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/policies/TopicDeletePolicy.java index 6028f63fdb..f2f61f204e 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/policies/TopicDeletePolicy.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/policies/TopicDeletePolicy.java @@ -22,10 +22,10 @@ package org.apache.qpid.server.virtualhost.plugins.policies; import org.apache.commons.configuration.ConfigurationException; import org.apache.log4j.Logger; + import org.apache.qpid.AMQException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.binding.Binding; -import org.apache.qpid.server.configuration.plugins.SlowConsumerDetectionPolicyConfiguration; import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; import org.apache.qpid.server.exchange.TopicExchange; import org.apache.qpid.server.logging.actors.CurrentActor; @@ -37,7 +37,7 @@ import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPluginFa public class TopicDeletePolicy implements SlowConsumerPolicyPlugin { - Logger _logger = Logger.getLogger(TopicDeletePolicy.class); + private Logger _logger = Logger.getLogger(TopicDeletePolicy.class); private TopicDeletePolicyConfiguration _configuration; public static class TopicDeletePolicyFactory implements SlowConsumerPolicyPluginFactory diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/policies/TopicDeletePolicyConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/policies/TopicDeletePolicyConfiguration.java index 7dfd22c733..48158b7dff 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/policies/TopicDeletePolicyConfiguration.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/policies/TopicDeletePolicyConfiguration.java @@ -20,14 +20,15 @@ */ package org.apache.qpid.server.virtualhost.plugins.policies; -import java.util.Arrays; -import java.util.List; - import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; + import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory; +import java.util.Arrays; +import java.util.List; + public class TopicDeletePolicyConfiguration extends ConfigurationPlugin { |