summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/SlowConsumerDetection.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/SlowConsumerDetection.java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/SlowConsumerDetection.java28
1 files changed, 22 insertions, 6 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/SlowConsumerDetection.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/SlowConsumerDetection.java
index 5c4fe0aab8..248b3b2143 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/SlowConsumerDetection.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/SlowConsumerDetection.java
@@ -26,11 +26,13 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.configuration.plugins.SlowConsumerDetectionConfiguration;
import org.apache.qpid.server.configuration.plugins.SlowConsumerDetectionQueueConfiguration;
import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.plugins.Plugin;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.plugins.logging.SlowConsumerDetectionMessages;
+import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPlugin;
public class SlowConsumerDetection extends VirtualHostHouseKeepingPlugin
{
@@ -56,7 +58,7 @@ public class SlowConsumerDetection extends VirtualHostHouseKeepingPlugin
/**
* Configures the slow consumer disconnect plugin by adding a listener to each exchange on this
- * cirtual host to record all the configured queues in a cache for processing by the housekeeping
+ * virtual host to record all the configured queues in a cache for processing by the housekeeping
* thread.
*
* @see Plugin#configure(ConfigurationPlugin)
@@ -65,9 +67,10 @@ public class SlowConsumerDetection extends VirtualHostHouseKeepingPlugin
{
_config = (SlowConsumerDetectionConfiguration) config;
_listener = new ConfiguredQueueBindingListener(getVirtualHost().getName());
- for (AMQShortString exchangeName : getVirtualHost().getExchangeRegistry().getExchangeNames())
+ final ExchangeRegistry exchangeRegistry = getVirtualHost().getExchangeRegistry();
+ for (AMQShortString exchangeName : exchangeRegistry.getExchangeNames())
{
- getVirtualHost().getExchangeRegistry().getExchange(exchangeName).addBindingListener(_listener);
+ exchangeRegistry.getExchange(exchangeName).addBindingListener(_listener);
}
}
@@ -87,11 +90,21 @@ public class SlowConsumerDetection extends VirtualHostHouseKeepingPlugin
try
{
- SlowConsumerDetectionQueueConfiguration config =
+ final SlowConsumerDetectionQueueConfiguration config =
q.getConfiguration().getConfiguration(SlowConsumerDetectionQueueConfiguration.class.getName());
if (checkQueueStatus(q, config))
{
- config.getPolicy().performPolicy(q);
+ final SlowConsumerPolicyPlugin policy = config.getPolicy();
+ if (policy == null)
+ {
+ // We would only expect to see this during shutdown
+ _logger.warn("No slow consumer policy for queue " + q.getName());
+ }
+ else
+ {
+ policy.performPolicy(q);
+ }
+
}
}
catch (Exception e)
@@ -126,7 +139,10 @@ public class SlowConsumerDetection extends VirtualHostHouseKeepingPlugin
{
if (config != null)
{
- _logger.info("Retrieved Queue(" + q.getName() + ") Config:" + config);
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Retrieved Queue(" + q.getName() + ") Config:" + config);
+ }
int count = q.getMessageCount();