summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/ConfiguredQueueBindingListener.java
blob: 12886f400a3583ac74c22dd67acedfb096309c4b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */
package org.apache.qpid.server.virtualhost.plugins;

import org.apache.log4j.Logger;

import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.configuration.plugins.SlowConsumerDetectionQueueConfiguration;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.Exchange.BindingListener;
import org.apache.qpid.server.queue.AMQQueue;

import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

/**
 * This is a listener that caches queues that are configured for slow consumer disconnection.
 * 
 * There should be one listener per virtual host, which can be added to all exchanges on
 * that host.
 * 
 * TODO In future, it will be possible to configure the policy at runtime, so only the queue
 * itself is cached, and the configuration looked up by the housekeeping thread. This means
 * that there may be occasions where the copy of the cache contents retrieved by the thread
 * does not contain queues that are configured, or that configured queues are not present.
 * 
 * @see BindingListener
 */
public class ConfiguredQueueBindingListener implements BindingListener
{
    private static final Logger _log = Logger.getLogger(ConfiguredQueueBindingListener.class);
    
    private String _vhostName;
    private Set<AMQQueue> _cache = Collections.synchronizedSet(new HashSet<AMQQueue>());
    
    public ConfiguredQueueBindingListener(String vhostName)
    {
        _vhostName = vhostName;
    }

    /**
     * @see BindingListener#bindingAdded(Exchange, Binding)
     */
    public void bindingAdded(Exchange exchange, Binding binding)
    {
        processBinding(binding);
    }

    /**
     * @see BindingListener#bindingRemoved(Exchange, Binding)
     */
    public void bindingRemoved(Exchange exchange, Binding binding)
    {
        processBinding(binding);
    }
    
    private void processBinding(Binding binding)
    {
        AMQQueue queue = binding.getQueue();
        
        SlowConsumerDetectionQueueConfiguration config =
            queue.getConfiguration().getConfiguration(SlowConsumerDetectionQueueConfiguration.class.getName());
        if (config != null)
        {
            _cache.add(queue);
        }
        else
        {
            _cache.remove(queue);
        }
    }
    
    /**
     * Lookup and return the cache of configured {@link AMQQueue}s.
     * 
	 * Note that when accessing the cached queues, the {@link java.util.Iterator} is not thread safe
	 * (see the {@link Collections#synchronizedSet(Set)} documentation) so a copy of the
	 * cache is returned.
     * 
     * @return a copy of the cached {@link java.util.Set} of queues
     */
    public Set<AMQQueue> getQueueCache()
    {
        return new HashSet<AMQQueue>(_cache);
    }
}