summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/SlowConsumerDetection.java
blob: 2c6705bb3bb670b0135006ab5a8f1b3160a71b3c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
/*
 *
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */
package org.apache.qpid.server.virtualhost.plugins;

import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
import org.apache.qpid.server.configuration.plugins.SlowConsumerDetectionConfiguration;
import org.apache.qpid.server.configuration.plugins.SlowConsumerDetectionQueueConfiguration;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.plugins.logging.SlowConsumerDetectionMessages;
import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPlugin;

import java.util.Set;
import java.util.concurrent.TimeUnit;

public class SlowConsumerDetection extends VirtualHostHouseKeepingPlugin
{
    private SlowConsumerDetectionConfiguration _config;
    private ConfiguredQueueBindingListener _listener;

    public static class SlowConsumerFactory implements VirtualHostPluginFactory
    {
        public SlowConsumerDetection newInstance(VirtualHost vhost)
        {
            SlowConsumerDetectionConfiguration config = vhost.getConfiguration().getConfiguration(SlowConsumerDetectionConfiguration.class.getName());

            if (config == null)
            {
                return null;
            }

            SlowConsumerDetection plugin = new SlowConsumerDetection(vhost);
            plugin.configure(config);
            return plugin;
        }
    }

    /**
     * Configures the slow consumer disconnect plugin by adding a listener to each exchange on this
     * virtual host to record all the configured queues in a cache for processing by the housekeeping
     * thread.
     * 
     * @see org.apache.qpid.server.plugins.Plugin#configure(ConfigurationPlugin)
     */
    public void configure(ConfigurationPlugin config)
    {        
        _config = (SlowConsumerDetectionConfiguration) config;
        _listener = new ConfiguredQueueBindingListener(getVirtualHost().getName());
        final ExchangeRegistry exchangeRegistry = getVirtualHost().getExchangeRegistry();
        for (AMQShortString exchangeName : exchangeRegistry.getExchangeNames())
        {
            exchangeRegistry.getExchange(exchangeName).addBindingListener(_listener);
        }
    }
    
    public SlowConsumerDetection(VirtualHost vhost)
    {
        super(vhost);
    }

    public void execute()
    {
        CurrentActor.get().message(SlowConsumerDetectionMessages.RUNNING());
        
        Set<AMQQueue> cache = _listener.getQueueCache();
        for (AMQQueue q : cache)
        {
            CurrentActor.get().message(SlowConsumerDetectionMessages.CHECKING_QUEUE(q.getName()));
            
            try
            {
                final SlowConsumerDetectionQueueConfiguration config =
                    q.getConfiguration().getConfiguration(SlowConsumerDetectionQueueConfiguration.class.getName());
                if (checkQueueStatus(q, config))
                {
                    final SlowConsumerPolicyPlugin policy = config.getPolicy();
                    if (policy == null)
                    {
                        // We would only expect to see this during shutdown
                        getLogger().warn("No slow consumer policy for queue " + q.getName());
                    }
                    else
                    {
                        policy.performPolicy(q);
                    }

                }
            }
            catch (Exception e)
            {
                // Don't throw exceptions as this will stop the house keeping task from running.
                getLogger().error("Exception in SlowConsumersDetection for queue: " + q.getName(), e);
            }
        }

        CurrentActor.get().message(SlowConsumerDetectionMessages.COMPLETE());
    }

    public long getDelay()
    {
        return _config.getDelay();
    }

    public TimeUnit getTimeUnit()
    {
        return _config.getTimeUnit();
    }

    /**
     * Check the depth,messageSize,messageAge,messageCount values for this q
     *
     * @param q      the queue to check
     * @param config the queue configuration to compare against the queue state
     *
     * @return true if the queue has reached a threshold.
     */
    private boolean checkQueueStatus(AMQQueue q, SlowConsumerDetectionQueueConfiguration config)
    {
        if (config != null)
        {
            if (getLogger().isInfoEnabled())
            {
                getLogger().info("Retrieved Queue(" + q.getName() + ") Config:" + config);
            }

            int count = q.getMessageCount();

            // First Check message counts
            if ((config.getMessageCount() != 0 && count >= config.getMessageCount()) ||
                // The check queue depth
                (config.getDepth() != 0 && q.getQueueDepth() >= config.getDepth()) ||
                // finally if we have messages on the queue check Arrival time.
                // We must check count as OldestArrival time is Long.MAX_LONG when
                // there are no messages.
                (config.getMessageAge() != 0 &&
                 ((count > 0) && q.getOldestMessageArrivalTime() >= config.getMessageAge())))
            {
                
                if (getLogger().isDebugEnabled())
                {
                    getLogger().debug("Detected Slow Consumer on Queue(" + q.getName() + ")");
                    getLogger().debug("Queue Count:" + q.getMessageCount() + ":" + config.getMessageCount());
                    getLogger().debug("Queue Depth:" + q.getQueueDepth() + ":" + config.getDepth());
                    getLogger().debug("Queue Arrival:" + q.getOldestMessageArrivalTime() + ":" + config.getMessageAge());
                }

                return true;
            }
        }
        return false;
    }

}