summaryrefslogtreecommitdiff
path: root/qpid/java/client/src/main/java/org/apache/qpid/client/DeliveryCountTracker.java
blob: af01b26aec126472e0c780727f15d2fb50ebdc35 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
package org.apache.qpid.client;

import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

import org.apache.commons.collections.BidiMap;
import org.apache.commons.collections.bidimap.TreeBidiMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DeliveryCountTracker
{
    private static final Logger _logger = LoggerFactory.getLogger(DeliveryCountTracker.class);
    
    /**
     * Bidirectional Map of JMSMessageID with MessageTag.
     */
    private BidiMap _jmsIDtoDeliverTag = new TreeBidiMap();
    
    /**
     * Map of JMSMessageIDs with count of deliveries.
     */
    private Map<String,Integer> _receivedMsgIDs;

    private int _capacity;
    
    /**
     * Creates a new DeliveryCountTracker instance.
     * 
     * @param capacity the number of records to track.
     * @throws IllegalArgumentException if specified capacity not > 0
     */
    public DeliveryCountTracker(int capacity) throws IllegalArgumentException
    {
        if(capacity <= 0)
        {
            throw new IllegalArgumentException("Specified capacity must be greater than 0.");
        }
        _capacity  = capacity;
        
        /*
         * HashMap Javadoc states: "If the initial capacity is greater than the maximum number
         * of entries divided by the load factor, no rehash operations will ever occur."
         * 
         * Specifying an additional 5% size at construction with a 1.0 load factor to pre-allocate
         * the entries, bound the max map size, and avoid size increases + associated rehashing.
         */
        int hashMapSize = (int)(_capacity * 1.05f);
        
        /*
         *  Using the access-ordered LinkedHashMap variant to leverage the LRU based entry removal
         *  behaviour provided when then overriding the removeEldestEntry method.
         */
        _receivedMsgIDs = new LinkedHashMap<String,Integer>(hashMapSize, 1.0f, true)
        {
            //Control the max size of the map using LRU based removal upon insertion.
            protected boolean removeEldestEntry(Map.Entry<String,Integer> eldest)
            {
                boolean remove = size() > _capacity;
                
                // If the supplied entry is to be removed, also remove its associated
                // delivery tag
                if(remove)
                {
                    String msgId = eldest.getKey();
                    
                    if (_logger.isDebugEnabled())
                    {
                        _logger.debug("Removing delivery count records for message : " + msgId);
                    }
                    
                    synchronized (DeliveryCountTracker.this)
                    {
                        //Also remove the message information from the deliveryTag map.
                        if(msgId != null)
                        {
                            _jmsIDtoDeliverTag.remove(msgId);
                        }
                    }
                }
                
                return remove;
            }
        };
    }

    /**
     * Record sighting of a particular JMSMessageID, with the given deliveryTag.
     *
     * @param msgID the JMSMessageID of the message to track
     * @param deliveryTag the delivery tag of the most recent encounter of the message
     */
    public synchronized void recordMessage(String msgID, long deliveryTag)
    {
        try
        {
            if(msgID == null)
            {
                //we can't distinguish between different
                //messages without a JMSMessageID, so skip
                return;
            }

            _jmsIDtoDeliverTag.put(msgID, deliveryTag);

            Integer count = _receivedMsgIDs.get(msgID);

            if(count != null)
            {
                ++count;
                if (_logger.isDebugEnabled())
                {
                    _logger.debug("Incrementing count for JMSMessageID: '" + msgID + "', value now: " + count);
                }
                _receivedMsgIDs.put(msgID, count);
            }
            else
            {
                if (_logger.isDebugEnabled())
                {
                    _logger.debug("Recording first sighting of JMSMessageID '" + msgID + "'");
                }
                _receivedMsgIDs.put(msgID, 1);
            }
        }
        catch(Exception e)
        {
            _logger.warn("Exception recording delivery count for message: " + msgID, e);
        }
    }

    /**
     * Returns the number of times the message related to the given delivery tag has been seen
     * 
     * @param deliveryTag delivery tag of the message to retrieve the delivery count for
     * @return the delivery count for that message, or 0 if there is no count known
     */
    public synchronized int getDeliveryCount(long deliveryTag)
    {
        String key = (String) _jmsIDtoDeliverTag.getKey(deliveryTag);

        int count = 0;

        if (key != null)
        {
            Integer val = _receivedMsgIDs.get(key);
            if (val != null)
            {
                count = val;
            }
        }

        return count; 
    }

    /**
     * Removes both JMSMessageID and count related records associated with the given deliveryTag if any such records exist.
     * @param deliveryTag the current tag of the message for which the JMSMessageID and count records should be removed
     */
    public synchronized void removeRecordsForMessage(long deliveryTag)
    {
        String msgId = (String) _jmsIDtoDeliverTag.removeValue(deliveryTag);

        if (msgId != null)
        {
            if (_logger.isDebugEnabled())
            {
                _logger.debug("Removed deliveryTag mapping for ID: '" + msgId + "'");
            }

            Integer count = _receivedMsgIDs.remove(msgId);
            if(count != null && _logger.isDebugEnabled())
            {
                _logger.debug("Removed count mapping for ID: '" + msgId + "' : " + count);
            }
        }
    }

    /**
     * Removes both JMSMessageID and count related records associated with the given deliveryTags if any such records exist.
     * @param deliveryTags the current tags of the messages for which the JMSMessageID and count records should be removed
     */
    public synchronized void removeRecordsForMessages(List<Long> deliveryTags)
    {
        if (deliveryTags == null)
        {
            return;
        }
        
        for(long tag : deliveryTags)
        {
            removeRecordsForMessage(tag);
        }
    }
}