summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java5
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ManagedVirtualHost.java4
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java10
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java238
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java172
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/ConfiguredQueueBindingListener.java13
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/SlowConsumerDetection.java29
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/VirtualHostHouseKeepingPlugin.java9
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/VirtualHostPlugin.java5
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/policies/TopicDeletePolicy.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/policies/TopicDeletePolicyConfiguration.java7
12 files changed, 338 insertions, 160 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java
index ebace95f65..523bafb8e1 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java
@@ -21,13 +21,14 @@
package org.apache.qpid.server.virtualhost;
import org.apache.log4j.Logger;
+
import org.apache.qpid.server.logging.RootMessageLogger;
import org.apache.qpid.server.logging.actors.AbstractActor;
import org.apache.qpid.server.logging.actors.CurrentActor;
public abstract class HouseKeepingTask implements Runnable
{
- Logger _logger = Logger.getLogger(this.getClass());
+ private Logger _logger = Logger.getLogger(this.getClass());
private VirtualHost _virtualHost;
@@ -59,7 +60,7 @@ public abstract class HouseKeepingTask implements Runnable
{
execute();
}
- catch (Throwable e)
+ catch (Exception e)
{
_logger.warn(this.getClass().getSimpleName() + " throw exception: " + e, e);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ManagedVirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ManagedVirtualHost.java
index 767474d5ae..cb7f213f06 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ManagedVirtualHost.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ManagedVirtualHost.java
@@ -20,10 +20,10 @@
*/
package org.apache.qpid.server.virtualhost;
-import java.io.IOException;
-
import org.apache.qpid.management.common.mbeans.annotations.MBeanAttribute;
+import java.io.IOException;
+
/**
* The management interface exposed to allow management of a virtualHost
*/
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
index 41a5471a64..4b586db628 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
@@ -22,7 +22,7 @@ package org.apache.qpid.server.virtualhost;
import java.util.Map;
import java.util.UUID;
-
+import java.util.concurrent.ScheduledFuture;
import org.apache.qpid.common.Closeable;
import org.apache.qpid.server.binding.BindingFactory;
import org.apache.qpid.server.configuration.ConfigStore;
@@ -37,10 +37,10 @@ import org.apache.qpid.server.protocol.v1_0.LinkRegistry;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.security.SecurityManager;
-import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
import org.apache.qpid.server.stats.StatisticsGatherer;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.txn.DtxRegistry;
public interface VirtualHost extends DurableConfigurationStore.Source, VirtualHostConfig, Closeable, StatisticsGatherer
{
@@ -60,8 +60,6 @@ public interface VirtualHost extends DurableConfigurationStore.Source, VirtualHo
DurableConfigurationStore getDurableConfigurationStore();
- AuthenticationManager getAuthenticationManager();
-
SecurityManager getSecurityManager();
void close();
@@ -97,7 +95,11 @@ public interface VirtualHost extends DurableConfigurationStore.Source, VirtualHo
ConfigStore getConfigStore();
+ DtxRegistry getDtxRegistry();
+
void removeBrokerConnection(BrokerLink brokerLink);
LinkRegistry getLinkRegistry(String remoteContainerId);
+
+ ScheduledFuture<?> scheduleTask(long delay, Runnable timeoutTask);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
index 51892d965a..0e965472d5 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
@@ -20,43 +20,47 @@
*/
package org.apache.qpid.server.virtualhost;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.UUID;
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.binding.BindingFactory;
+import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.federation.BrokerLink;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.messages.TransactionLogMessages;
+import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.message.AbstractServerMessageImpl;
import org.apache.qpid.server.message.EnqueableMessage;
+import org.apache.qpid.server.message.MessageTransferMessage;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
import org.apache.qpid.server.store.TransactionLogResource;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.AMQQueueFactory;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.messages.TransactionLogMessages;
-import org.apache.qpid.server.message.AMQMessage;
-import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.message.MessageTransferMessage;
-import org.apache.qpid.server.binding.BindingFactory;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.AMQException;
-
-import org.apache.log4j.Logger;
+import org.apache.qpid.server.txn.DtxBranch;
+import org.apache.qpid.server.txn.DtxRegistry;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.transport.Xid;
+import org.apache.qpid.transport.util.Functions;
import org.apache.qpid.util.ByteBufferInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Map;
-import java.util.HashMap;
-import java.util.TreeMap;
-import java.util.UUID;
-
public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHandler,
ConfigurationRecoveryHandler.QueueRecoveryHandler,
ConfigurationRecoveryHandler.ExchangeRecoveryHandler,
@@ -65,7 +69,8 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
MessageStoreRecoveryHandler,
MessageStoreRecoveryHandler.StoredMessageRecoveryHandler,
TransactionLogRecoveryHandler,
- TransactionLogRecoveryHandler.QueueEntryRecoveryHandler
+ TransactionLogRecoveryHandler.QueueEntryRecoveryHandler,
+ TransactionLogRecoveryHandler.DtxRecordRecoveryHandler
{
private static final Logger _logger = Logger.getLogger(VirtualHostConfigRecoveryHandler.class);
@@ -78,7 +83,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
private MessageStore _store;
private final Map<String, Integer> _queueRecoveries = new TreeMap<String, Integer>();
- private Map<Long, ServerMessage> _recoveredMessages = new HashMap<Long, ServerMessage>();
+ private Map<Long, AbstractServerMessageImpl> _recoveredMessages = new HashMap<Long, AbstractServerMessageImpl>();
private Map<Long, StoredMessage> _unusedMessages = new HashMap<Long, StoredMessage>();
@@ -160,7 +165,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
public void message(StoredMessage message)
{
- ServerMessage serverMessage;
+ AbstractServerMessageImpl serverMessage;
switch(message.getMetaData().getType())
{
case META_DATA_0_8:
@@ -173,9 +178,6 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
throw new RuntimeException("Unknown message type retrieved from store " + message.getMetaData().getClass());
}
- //_logger.debug("Recovered message with id " + serverMessage);
-
-
_recoveredMessages.put(message.getMessageNumber(), serverMessage);
_unusedMessages.put(message.getMessageNumber(), message);
}
@@ -198,6 +200,164 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
{
}
+ public void dtxRecord(long format, byte[] globalId, byte[] branchId,
+ MessageStore.Transaction.Record[] enqueues,
+ MessageStore.Transaction.Record[] dequeues)
+ {
+ Xid id = new Xid(format, globalId, branchId);
+ DtxRegistry dtxRegistry = _virtualHost.getDtxRegistry();
+ DtxBranch branch = dtxRegistry.getBranch(id);
+ if(branch == null)
+ {
+ branch = new DtxBranch(id, _store, _virtualHost);
+ dtxRegistry.registerBranch(branch);
+ }
+ for(MessageStore.Transaction.Record record : enqueues)
+ {
+ final AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(record.getQueue().getResourceName());
+ if(queue != null)
+ {
+ final long messageId = record.getMessage().getMessageNumber();
+ final AbstractServerMessageImpl message = _recoveredMessages.get(messageId);
+ _unusedMessages.remove(messageId);
+
+ if(message != null)
+ {
+ message.incrementReference();
+
+ branch.enqueue(queue,message);
+
+ branch.addPostTransactionAcion(new ServerTransaction.Action()
+ {
+
+ public void postCommit()
+ {
+ try
+ {
+
+ queue.enqueue(message, true, null);
+ message.decrementReference();
+ }
+ catch (AMQException e)
+ {
+ _logger.error("Unable to enqueue message " + message.getMessageNumber() + " into " +
+ "queue " + queue.getName() + " (from XA transaction)", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void onRollback()
+ {
+ message.decrementReference();
+ }
+ });
+ }
+ else
+ {
+ StringBuilder xidString = xidAsString(id);
+ String messageNumberString = String.valueOf(message.getMessageNumber());
+ CurrentActor.get().message(_logSubject,
+ TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidString.toString(),
+ messageNumberString));
+
+ }
+
+ }
+ else
+ {
+ StringBuilder xidString = xidAsString(id);
+ CurrentActor.get().message(_logSubject,
+ TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(),
+ record.getQueue().getResourceName()));
+
+ }
+ }
+ for(MessageStore.Transaction.Record record : dequeues)
+ {
+ final AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(record.getQueue().getResourceName());
+ if(queue != null)
+ {
+ final long messageId = record.getMessage().getMessageNumber();
+ final AbstractServerMessageImpl message = _recoveredMessages.get(messageId);
+ _unusedMessages.remove(messageId);
+
+ if(message != null)
+ {
+ final QueueEntry entry = queue.getMessageOnTheQueue(messageId);
+
+ entry.acquire();
+
+ branch.dequeue(queue, message);
+
+ branch.addPostTransactionAcion(new ServerTransaction.Action()
+ {
+
+ public void postCommit()
+ {
+ entry.discard();
+ }
+
+ public void onRollback()
+ {
+ entry.release();
+ }
+ });
+ }
+ else
+ {
+ StringBuilder xidString = xidAsString(id);
+ String messageNumberString = String.valueOf(message.getMessageNumber());
+ CurrentActor.get().message(_logSubject,
+ TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidString.toString(),
+ messageNumberString));
+
+ }
+
+ }
+ else
+ {
+ StringBuilder xidString = xidAsString(id);
+ CurrentActor.get().message(_logSubject,
+ TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(),
+ queue.getName()));
+ }
+
+ }
+
+ try
+ {
+ branch.setState(DtxBranch.State.PREPARED);
+ branch.prePrepareTransaction();
+ }
+ catch (AMQStoreException e)
+ {
+ _logger.error("Unexpected database exception when attempting to prepare a recovered XA transaction " +
+ xidAsString(id), e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static StringBuilder xidAsString(Xid id)
+ {
+ return new StringBuilder("(")
+ .append(id.getFormat())
+ .append(',')
+ .append(Functions.str(id.getGlobalId()))
+ .append(',')
+ .append(Functions.str(id.getBranchId()))
+ .append(')');
+ }
+
+ public void completeDtxRecordRecovery()
+ {
+ for(StoredMessage m : _unusedMessages.values())
+ {
+ _logger.warn("Message id " + m.getMessageNumber() + " in store, but not in any queue - removing....");
+ m.remove();
+ }
+ CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(null, false));
+ }
+
private static final class ProcessAction
{
private final AMQQueue _queue;
@@ -354,15 +514,9 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
}
- public void completeQueueEntryRecovery()
+ public DtxRecordRecoveryHandler completeQueueEntryRecovery()
{
- for(StoredMessage m : _unusedMessages.values())
- {
- _logger.warn("Message id " + m.getMessageNumber() + " in store, but not in any queue - removing....");
- m.remove();
- }
-
for(Map.Entry<String,Integer> entry : _queueRecoveries.entrySet())
{
CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERED(entry.getValue(), entry.getKey()));
@@ -370,7 +524,9 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(entry.getKey(), true));
}
- CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(null, false));
+
+
+ return this;
}
private static class DummyMessage implements EnqueableMessage
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
index a4a3633af7..9a0606f47a 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
@@ -20,21 +20,11 @@
*/
package org.apache.qpid.server.virtualhost;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import javax.management.NotCompliantMBeanException;
-import javax.management.ObjectName;
-
+import java.util.concurrent.ScheduledFuture;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.log4j.Logger;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQStoreException;
import org.apache.qpid.framing.AMQShortString;
@@ -73,14 +63,25 @@ import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.security.SecurityManager;
-import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.txn.DtxRegistry;
import org.apache.qpid.server.virtualhost.plugins.VirtualHostPlugin;
import org.apache.qpid.server.virtualhost.plugins.VirtualHostPluginFactory;
+import javax.management.NotCompliantMBeanException;
+import javax.management.ObjectName;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
public class VirtualHostImpl implements VirtualHost
{
private static final Logger _logger = Logger.getLogger(VirtualHostImpl.class);
@@ -97,11 +98,11 @@ public class VirtualHostImpl implements VirtualHost
private MessageStore _messageStore;
- protected VirtualHostMBean _virtualHostMBean;
+ private DtxRegistry _dtxRegistry;
- private AMQBrokerManagerMBean _brokerMBean;
+ private VirtualHostMBean _virtualHostMBean;
- private final AuthenticationManager _authenticationManager;
+ private AMQBrokerManagerMBean _brokerMBean;
private SecurityManager _securityManager;
@@ -121,6 +122,7 @@ public class VirtualHostImpl implements VirtualHost
private static final int HOUSEKEEPING_SHUTDOWN_TIMEOUT = 5;
private final Map<String, LinkRegistry> _linkRegistry = new HashMap<String, LinkRegistry>();
+
public IConnectionRegistry getConnectionRegistry()
{
return _connectionRegistry;
@@ -191,6 +193,7 @@ public class VirtualHostImpl implements VirtualHost
_broker = _appRegistry.getBroker();
_configuration = hostConfig;
_name = _configuration.getName();
+ _dtxRegistry = new DtxRegistry();
_id = _appRegistry.getConfigStore().createId();
@@ -241,7 +244,6 @@ public class VirtualHostImpl implements VirtualHost
initialiseMessageStore(hostConfig);
}
- _authenticationManager = ApplicationRegistry.getInstance().getAuthenticationManager();
_brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean);
_brokerMBean.register();
@@ -260,54 +262,9 @@ public class VirtualHostImpl implements VirtualHost
{
if (period != 0L)
{
- class VirtualHostHouseKeepingTask extends HouseKeepingTask
- {
- public VirtualHostHouseKeepingTask(VirtualHost vhost)
- {
- super(vhost);
- }
- public void execute()
- {
- for (AMQQueue q : _queueRegistry.getQueues())
- {
- _logger.debug("Checking message status for queue: "
- + q.getName());
- try
- {
- q.checkMessageStatus();
- }
- catch (Exception e)
- {
- _logger.error("Exception in housekeeping for queue: "
- + q.getNameShortString().toString(), e);
- //Don't throw exceptions as this will stop the
- // house keeping task from running.
- }
- }
- for (AMQConnectionModel connection : getConnectionRegistry().getConnections())
- {
- _logger.debug("Checking for long running open transactions on connection " + connection);
- for (AMQSessionModel session : connection.getSessionModels())
- {
- _logger.debug("Checking for long running open transactions on session " + session);
- try
- {
- session.checkTransactionStatus(_configuration.getTransactionTimeoutOpenWarn(),
- _configuration.getTransactionTimeoutOpenClose(),
- _configuration.getTransactionTimeoutIdleWarn(),
- _configuration.getTransactionTimeoutIdleClose());
- }
- catch (Exception e)
- {
- _logger.error("Exception in housekeeping for connection: " + connection.toString(), e);
- }
- }
- }
- }
- }
- scheduleHouseKeepingTask(period, new VirtualHostHouseKeepingTask(this));
+ scheduleHouseKeepingTask(period, new VirtualHostHouseKeepingTask());
Map<String, VirtualHostPluginFactory> plugins =
ApplicationRegistry.getInstance().getPluginManager().getVirtualHostPlugins();
@@ -340,6 +297,53 @@ public class VirtualHostImpl implements VirtualHost
}
}
+ private class VirtualHostHouseKeepingTask extends HouseKeepingTask
+ {
+ public VirtualHostHouseKeepingTask()
+ {
+ super(VirtualHostImpl.this);
+ }
+
+ public void execute()
+ {
+ for (AMQQueue q : _queueRegistry.getQueues())
+ {
+ _logger.debug("Checking message status for queue: "
+ + q.getName());
+ try
+ {
+ q.checkMessageStatus();
+ }
+ catch (Exception e)
+ {
+ _logger.error("Exception in housekeeping for queue: "
+ + q.getNameShortString().toString(), e);
+ //Don't throw exceptions as this will stop the
+ // house keeping task from running.
+ }
+ }
+ for (AMQConnectionModel connection : getConnectionRegistry().getConnections())
+ {
+ _logger.debug("Checking for long running open transactions on connection " + connection);
+ for (AMQSessionModel session : connection.getSessionModels())
+ {
+ _logger.debug("Checking for long running open transactions on session " + session);
+ try
+ {
+ session.checkTransactionStatus(_configuration.getTransactionTimeoutOpenWarn(),
+ _configuration.getTransactionTimeoutOpenClose(),
+ _configuration.getTransactionTimeoutIdleWarn(),
+ _configuration.getTransactionTimeoutIdleClose());
+ }
+ catch (Exception e)
+ {
+ _logger.error("Exception in housekeeping for connection: " + connection.toString(), e);
+ }
+ }
+ }
+ }
+ }
+
/**
* Allow other broker components to register a HouseKeepingTask
*
@@ -352,6 +356,11 @@ public class VirtualHostImpl implements VirtualHost
TimeUnit.MILLISECONDS);
}
+ public ScheduledFuture<?> scheduleTask(long delay, Runnable task)
+ {
+ return _houseKeepingTasks.schedule(task, delay, TimeUnit.MILLISECONDS);
+ }
+
public long getHouseKeepingTaskCount()
{
return _houseKeepingTasks.getTaskCount();
@@ -575,11 +584,6 @@ public class VirtualHostImpl implements VirtualHost
return _durableConfigurationStore;
}
- public AuthenticationManager getAuthenticationManager()
- {
- return _authenticationManager;
- }
-
public SecurityManager getSecurityManager()
{
return _securityManager;
@@ -618,6 +622,11 @@ public class VirtualHostImpl implements VirtualHost
}
}
+ if(_dtxRegistry != null)
+ {
+ _dtxRegistry.close();
+ }
+
//Close MessageStore
if (_messageStore != null)
{
@@ -796,6 +805,11 @@ public class VirtualHostImpl implements VirtualHost
return getApplicationRegistry().getConfigStore();
}
+ public DtxRegistry getDtxRegistry()
+ {
+ return _dtxRegistry;
+ }
+
/**
* Temporary Startup RT class to record the creation of persistent queues / exchanges.
*
@@ -805,11 +819,11 @@ public class VirtualHostImpl implements VirtualHost
*/
private static class StartupRoutingTable implements DurableConfigurationStore
{
- public List<Exchange> exchange = new LinkedList<Exchange>();
- public List<CreateQueueTuple> queue = new LinkedList<CreateQueueTuple>();
- public List<CreateBindingTuple> bindings = new LinkedList<CreateBindingTuple>();
- public List<BrokerLink> links = new LinkedList<BrokerLink>();
- public List<Bridge> bridges = new LinkedList<Bridge>();
+ private List<Exchange> exchange = new LinkedList<Exchange>();
+ private List<CreateQueueTuple> queue = new LinkedList<CreateQueueTuple>();
+ private List<CreateBindingTuple> bindings = new LinkedList<CreateBindingTuple>();
+ private List<BrokerLink> links = new LinkedList<BrokerLink>();
+ private List<Bridge> bridges = new LinkedList<Bridge>();
public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
{
@@ -876,8 +890,8 @@ public class VirtualHostImpl implements VirtualHost
private static class CreateQueueTuple
{
- public AMQQueue queue;
- public FieldTable arguments;
+ private AMQQueue queue;
+ private FieldTable arguments;
public CreateQueueTuple(AMQQueue queue, FieldTable arguments)
{
@@ -888,10 +902,10 @@ public class VirtualHostImpl implements VirtualHost
private static class CreateBindingTuple
{
- public AMQQueue queue;
- public FieldTable arguments;
- public Exchange exchange;
- public AMQShortString routingKey;
+ private AMQQueue queue;
+ private FieldTable arguments;
+ private Exchange exchange;
+ private AMQShortString routingKey;
public CreateBindingTuple(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args)
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java
index 32d0c4c4d1..ef621a166a 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java
@@ -21,8 +21,8 @@
package org.apache.qpid.server.virtualhost;
import org.apache.qpid.common.Closeable;
-import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.configuration.ConfigStore;
+import org.apache.qpid.server.registry.ApplicationRegistry;
import java.util.ArrayList;
import java.util.Collection;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/ConfiguredQueueBindingListener.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/ConfiguredQueueBindingListener.java
index 12206013eb..12886f400a 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/ConfiguredQueueBindingListener.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/ConfiguredQueueBindingListener.java
@@ -20,19 +20,18 @@
*/
package org.apache.qpid.server.virtualhost.plugins;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Set;
-
import org.apache.log4j.Logger;
+
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.configuration.plugins.SlowConsumerDetectionQueueConfiguration;
-import org.apache.qpid.server.exchange.AbstractExchange;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.Exchange.BindingListener;
import org.apache.qpid.server.queue.AMQQueue;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
/**
* This is a listener that caches queues that are configured for slow consumer disconnection.
*
@@ -93,7 +92,7 @@ public class ConfiguredQueueBindingListener implements BindingListener
/**
* Lookup and return the cache of configured {@link AMQQueue}s.
*
- * Note that when accessing the cached queues, the {@link Iterator} is not thread safe
+ * Note that when accessing the cached queues, the {@link java.util.Iterator} is not thread safe
* (see the {@link Collections#synchronizedSet(Set)} documentation) so a copy of the
* cache is returned.
*
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/SlowConsumerDetection.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/SlowConsumerDetection.java
index 248b3b2143..2c6705bb3b 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/SlowConsumerDetection.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/SlowConsumerDetection.java
@@ -19,21 +19,20 @@
*/
package org.apache.qpid.server.virtualhost.plugins;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
import org.apache.qpid.server.configuration.plugins.SlowConsumerDetectionConfiguration;
import org.apache.qpid.server.configuration.plugins.SlowConsumerDetectionQueueConfiguration;
-import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.plugins.Plugin;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.plugins.logging.SlowConsumerDetectionMessages;
import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPlugin;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
public class SlowConsumerDetection extends VirtualHostHouseKeepingPlugin
{
private SlowConsumerDetectionConfiguration _config;
@@ -61,7 +60,7 @@ public class SlowConsumerDetection extends VirtualHostHouseKeepingPlugin
* virtual host to record all the configured queues in a cache for processing by the housekeeping
* thread.
*
- * @see Plugin#configure(ConfigurationPlugin)
+ * @see org.apache.qpid.server.plugins.Plugin#configure(ConfigurationPlugin)
*/
public void configure(ConfigurationPlugin config)
{
@@ -98,7 +97,7 @@ public class SlowConsumerDetection extends VirtualHostHouseKeepingPlugin
if (policy == null)
{
// We would only expect to see this during shutdown
- _logger.warn("No slow consumer policy for queue " + q.getName());
+ getLogger().warn("No slow consumer policy for queue " + q.getName());
}
else
{
@@ -110,7 +109,7 @@ public class SlowConsumerDetection extends VirtualHostHouseKeepingPlugin
catch (Exception e)
{
// Don't throw exceptions as this will stop the house keeping task from running.
- _logger.error("Exception in SlowConsumersDetection for queue: " + q.getName(), e);
+ getLogger().error("Exception in SlowConsumersDetection for queue: " + q.getName(), e);
}
}
@@ -139,9 +138,9 @@ public class SlowConsumerDetection extends VirtualHostHouseKeepingPlugin
{
if (config != null)
{
- if (_logger.isInfoEnabled())
+ if (getLogger().isInfoEnabled())
{
- _logger.info("Retrieved Queue(" + q.getName() + ") Config:" + config);
+ getLogger().info("Retrieved Queue(" + q.getName() + ") Config:" + config);
}
int count = q.getMessageCount();
@@ -157,12 +156,12 @@ public class SlowConsumerDetection extends VirtualHostHouseKeepingPlugin
((count > 0) && q.getOldestMessageArrivalTime() >= config.getMessageAge())))
{
- if (_logger.isDebugEnabled())
+ if (getLogger().isDebugEnabled())
{
- _logger.debug("Detected Slow Consumer on Queue(" + q.getName() + ")");
- _logger.debug("Queue Count:" + q.getMessageCount() + ":" + config.getMessageCount());
- _logger.debug("Queue Depth:" + q.getQueueDepth() + ":" + config.getDepth());
- _logger.debug("Queue Arrival:" + q.getOldestMessageArrivalTime() + ":" + config.getMessageAge());
+ getLogger().debug("Detected Slow Consumer on Queue(" + q.getName() + ")");
+ getLogger().debug("Queue Count:" + q.getMessageCount() + ":" + config.getMessageCount());
+ getLogger().debug("Queue Depth:" + q.getQueueDepth() + ":" + config.getDepth());
+ getLogger().debug("Queue Arrival:" + q.getOldestMessageArrivalTime() + ":" + config.getMessageAge());
}
return true;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/VirtualHostHouseKeepingPlugin.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/VirtualHostHouseKeepingPlugin.java
index 3798f47f0b..191f8041d2 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/VirtualHostHouseKeepingPlugin.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/VirtualHostHouseKeepingPlugin.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.virtualhost.plugins;
import org.apache.log4j.Logger;
+
import org.apache.qpid.server.virtualhost.HouseKeepingTask;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -28,7 +29,7 @@ import java.util.concurrent.TimeUnit;
public abstract class VirtualHostHouseKeepingPlugin extends HouseKeepingTask implements VirtualHostPlugin
{
- protected final Logger _logger = Logger.getLogger(getClass());
+ private final Logger _logger = Logger.getLogger(getClass());
public VirtualHostHouseKeepingPlugin(VirtualHost vhost)
{
@@ -51,4 +52,10 @@ public abstract class VirtualHostHouseKeepingPlugin extends HouseKeepingTask imp
* @see java.util.concurrent.TimeUnit for valid value.
*/
public abstract TimeUnit getTimeUnit();
+
+
+ protected Logger getLogger()
+ {
+ return _logger;
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/VirtualHostPlugin.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/VirtualHostPlugin.java
index 1886c2d01d..35f6228ab9 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/VirtualHostPlugin.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/VirtualHostPlugin.java
@@ -20,10 +20,9 @@
*/
package org.apache.qpid.server.virtualhost.plugins;
-import java.util.concurrent.TimeUnit;
-
import org.apache.qpid.server.plugins.Plugin;
-import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import java.util.concurrent.TimeUnit;
public interface VirtualHostPlugin extends Runnable, Plugin
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/policies/TopicDeletePolicy.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/policies/TopicDeletePolicy.java
index 6028f63fdb..f2f61f204e 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/policies/TopicDeletePolicy.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/policies/TopicDeletePolicy.java
@@ -22,10 +22,10 @@ package org.apache.qpid.server.virtualhost.plugins.policies;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.log4j.Logger;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.binding.Binding;
-import org.apache.qpid.server.configuration.plugins.SlowConsumerDetectionPolicyConfiguration;
import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
import org.apache.qpid.server.exchange.TopicExchange;
import org.apache.qpid.server.logging.actors.CurrentActor;
@@ -37,7 +37,7 @@ import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPluginFa
public class TopicDeletePolicy implements SlowConsumerPolicyPlugin
{
- Logger _logger = Logger.getLogger(TopicDeletePolicy.class);
+ private Logger _logger = Logger.getLogger(TopicDeletePolicy.class);
private TopicDeletePolicyConfiguration _configuration;
public static class TopicDeletePolicyFactory implements SlowConsumerPolicyPluginFactory
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/policies/TopicDeletePolicyConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/policies/TopicDeletePolicyConfiguration.java
index 7dfd22c733..48158b7dff 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/policies/TopicDeletePolicyConfiguration.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/policies/TopicDeletePolicyConfiguration.java
@@ -20,14 +20,15 @@
*/
package org.apache.qpid.server.virtualhost.plugins.policies;
-import java.util.Arrays;
-import java.util.List;
-
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
+
import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory;
+import java.util.Arrays;
+import java.util.List;
+
public class TopicDeletePolicyConfiguration extends ConfigurationPlugin
{