diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/SlowConsumerDetection.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/SlowConsumerDetection.java | 28 |
1 files changed, 22 insertions, 6 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/SlowConsumerDetection.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/SlowConsumerDetection.java index 5c4fe0aab8..248b3b2143 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/SlowConsumerDetection.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/SlowConsumerDetection.java @@ -26,11 +26,13 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.configuration.plugins.SlowConsumerDetectionConfiguration; import org.apache.qpid.server.configuration.plugins.SlowConsumerDetectionQueueConfiguration; import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; +import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.plugins.Plugin; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.plugins.logging.SlowConsumerDetectionMessages; +import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPlugin; public class SlowConsumerDetection extends VirtualHostHouseKeepingPlugin { @@ -56,7 +58,7 @@ public class SlowConsumerDetection extends VirtualHostHouseKeepingPlugin /** * Configures the slow consumer disconnect plugin by adding a listener to each exchange on this - * cirtual host to record all the configured queues in a cache for processing by the housekeeping + * virtual host to record all the configured queues in a cache for processing by the housekeeping * thread. * * @see Plugin#configure(ConfigurationPlugin) @@ -65,9 +67,10 @@ public class SlowConsumerDetection extends VirtualHostHouseKeepingPlugin { _config = (SlowConsumerDetectionConfiguration) config; _listener = new ConfiguredQueueBindingListener(getVirtualHost().getName()); - for (AMQShortString exchangeName : getVirtualHost().getExchangeRegistry().getExchangeNames()) + final ExchangeRegistry exchangeRegistry = getVirtualHost().getExchangeRegistry(); + for (AMQShortString exchangeName : exchangeRegistry.getExchangeNames()) { - getVirtualHost().getExchangeRegistry().getExchange(exchangeName).addBindingListener(_listener); + exchangeRegistry.getExchange(exchangeName).addBindingListener(_listener); } } @@ -87,11 +90,21 @@ public class SlowConsumerDetection extends VirtualHostHouseKeepingPlugin try { - SlowConsumerDetectionQueueConfiguration config = + final SlowConsumerDetectionQueueConfiguration config = q.getConfiguration().getConfiguration(SlowConsumerDetectionQueueConfiguration.class.getName()); if (checkQueueStatus(q, config)) { - config.getPolicy().performPolicy(q); + final SlowConsumerPolicyPlugin policy = config.getPolicy(); + if (policy == null) + { + // We would only expect to see this during shutdown + _logger.warn("No slow consumer policy for queue " + q.getName()); + } + else + { + policy.performPolicy(q); + } + } } catch (Exception e) @@ -126,7 +139,10 @@ public class SlowConsumerDetection extends VirtualHostHouseKeepingPlugin { if (config != null) { - _logger.info("Retrieved Queue(" + q.getName() + ") Config:" + config); + if (_logger.isInfoEnabled()) + { + _logger.info("Retrieved Queue(" + q.getName() + ") Config:" + config); + } int count = q.getMessageCount(); |