summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2010-12-07 12:22:54 +0000
committerRobert Gemmell <robbie@apache.org>2010-12-07 12:22:54 +0000
commit2eaa40c0c778d0bedb7225d850da00ac66c04537 (patch)
tree8249d2cfa036ffddce337e8ef5471a358017b5dc
parentffcc6f37389171664f9a6d2479462e3591589080 (diff)
downloadqpid-python-2eaa40c0c778d0bedb7225d850da00ac66c04537.tar.gz
QPID-2971 - onMessage/recieve + recover/rollback handling of Max Delivery Count for 0-8/0-9 consumers
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.5.x-dev@1042997 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java114
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java28
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java207
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java7
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/DeliveryCountTracker.java196
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/client/DeliveryCountTrackerTest.java158
6 files changed, 584 insertions, 126 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 9f5ab79ddf..3a542765ca 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -314,6 +314,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
/** All the delivered message tags */
protected ConcurrentLinkedQueue<Long> _deliveredMessageTags = new ConcurrentLinkedQueue<Long>();
+ /** The last asynchronously delivered auto-ack message tag */
+ protected Long _lastAsyncAutoAckDeliveryTag = null;
+
/** Holds the dispatcher thread for this session. */
protected Dispatcher _dispatcher;
@@ -821,7 +824,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
"Forced rollback");
}
-
+ ArrayList<Long> ackedMessageTags = new ArrayList<Long>();
+
// Acknowledge all delivered messages
while (true)
{
@@ -832,10 +836,17 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
acknowledgeMessage(tag, false);
+ ackedMessageTags.add(tag);
}
// Commits outstanding messages and acknowledgments
sendCommit();
markClean();
+
+ //remove MaxRedelivery info for the commited deliveryTags to enhance retention of other message tags
+ for(C consumer : _consumers.values())
+ {
+ consumer.removeDeliveryCountRecordsForMessages(ackedMessageTags);
+ }
}
catch (AMQException e)
{
@@ -1631,6 +1642,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
_dispatcher.rollback();
}
+ enforceMaxDeliveryCountDuringRecover();
+
sendRecover();
markClean();
@@ -1650,6 +1663,76 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
}
+ private void enforceMaxDeliveryCountDuringRecover()
+ {
+ ArrayList<C> consumersToCheck = new ArrayList<C>(_consumers.values());
+ Iterator<C> iter = consumersToCheck.iterator();
+ if(!iter.hasNext())
+ {
+ return;
+ }
+
+ //remove any consumers not enforcing MaxDelivery
+ while(iter.hasNext())
+ {
+ C con = iter.next();
+ if(!con.isMaxDeliveryCountEnforced())
+ {
+ iter.remove();
+ }
+ }
+
+ if (consumersToCheck.size() > 0)
+ {
+ //reject(false) any messages we don't want returned again
+ switch(_acknowledgeMode)
+ {
+ case Session.CLIENT_ACKNOWLEDGE:
+ for(long tag : _unacknowledgedMessageTags)
+ {
+ for(C consumer : consumersToCheck)
+ {
+ if(!consumer.shouldRequeueMessage(tag))
+ {
+ //consumer said we should not requeue the message, do reject(false)
+ rejectMessage(tag, false);
+
+ //explicitly remove records for message, we know they wont be used again
+ consumer.removeDeliveryCountRecordsForMessage(tag);
+ //no need to check other consumers now
+ break;
+ }
+ }
+ }
+ break;
+ case Session.DUPS_OK_ACKNOWLEDGE:
+ //fall through
+ case Session.AUTO_ACKNOWLEDGE:
+ //check the last message asynchronously delivered via auto-ack
+ Long tag = getLastAsyncAutoAckDeliveryTag();
+ clearLastAsyncAutoAckDeliveryTag();
+
+ if(tag != null)
+ {
+ for(C consumer : consumersToCheck)
+ {
+ if(consumer.isMessageListenerSet() && !consumer.shouldRequeueMessage(tag))
+ {
+ //consumer said we should not requeue the message, do reject(false)
+ rejectMessage(tag, false);
+
+ //explicitly remove records for message, we know they wont be used again
+ consumer.removeDeliveryCountRecordsForMessage(tag);
+ //no need to check other consumers now
+ break;
+ }
+ }
+ }
+ break;
+ }
+ }
+ }
+
protected abstract void sendRecover() throws AMQException, FailoverException;
public void rejectMessage(UnprocessedMessage message, boolean requeue)
@@ -2046,7 +2129,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
void syncDispatchQueue()
{
- if (Thread.currentThread() == _dispatcherThread)
+ if (isDispatcherThread())
{
while (!_closed.get() && !_queue.isEmpty())
{
@@ -2154,7 +2237,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
void startDispatcherIfNecessary()
{
//If we are the dispatcher then we don't need to check we are started
- if (Thread.currentThread() == _dispatcherThread)
+ if (isDispatcherThread())
{
return;
}
@@ -2762,11 +2845,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
_producers.put(new Long(producerId), producer);
}
- private void rejectAllMessages(boolean requeue)
- {
- rejectMessagesForConsumerTag(0, requeue, true);
- }
-
/**
* @param consumerTag The consumerTag to prune from queue or all if null
* @param requeue Should the removed messages be requeued (or discarded. Possibly to DLQ)
@@ -3307,4 +3385,24 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
return _closing.get()|| _connection.isClosing();
}
+
+ public void setLastAsyncAutoAckDeliveryTag(Long tag)
+ {
+ _lastAsyncAutoAckDeliveryTag = tag;
+ }
+
+ public Long getLastAsyncAutoAckDeliveryTag()
+ {
+ return _lastAsyncAutoAckDeliveryTag;
+ }
+
+ public void clearLastAsyncAutoAckDeliveryTag()
+ {
+ _lastAsyncAutoAckDeliveryTag = null;
+ }
+
+ protected boolean isDispatcherThread()
+ {
+ return Thread.currentThread() == _dispatcherThread;
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index fa048c9997..c7a020c970 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.client;
-
import java.util.Map;
import javax.jms.Destination;
@@ -241,17 +240,38 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B
break;
}
- rejectMessage(tag, true);
+ boolean shouldRequeue = true;
+ BasicMessageConsumer_0_8 rejectingConsumer = null;
+
+ for(BasicMessageConsumer_0_8 consumer : _consumers.values())
+ {
+ shouldRequeue = consumer.shouldRequeueMessage(tag);
+ if(!shouldRequeue)
+ {
+ rejectingConsumer = consumer;
+
+ //no need to consult other consumers now, it is rejected.
+ break;
+ }
+ }
+
+ rejectMessage(tag, shouldRequeue);
+ if(!shouldRequeue)
+ {
+ //explicitly remove records for message, we know they wont be used again
+ rejectingConsumer.removeDeliveryCountRecordsForMessage(tag);
+ }
}
}
public void rejectMessage(long deliveryTag, boolean requeue)
{
- if ((_acknowledgeMode == CLIENT_ACKNOWLEDGE) || (_acknowledgeMode == SESSION_TRANSACTED))
+ if ((_acknowledgeMode == CLIENT_ACKNOWLEDGE) || (_acknowledgeMode == SESSION_TRANSACTED) ||
+ ((_acknowledgeMode == AUTO_ACKNOWLEDGE || _acknowledgeMode == DUPS_OK_ACKNOWLEDGE ) && hasMessageListeners()))
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Rejecting delivery tag:" + deliveryTag + ":SessionHC:" + this.hashCode());
+ _logger.debug("Rejecting delivery tag:" + deliveryTag + ":ReQueue:" + requeue + ":SessionHC:" + this.hashCode());
}
BasicRejectBody body = getMethodRegistry().createBasicRejectBody(deliveryTag, requeue);
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index bea43cc232..8d43bef4ec 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -33,15 +33,11 @@ import org.slf4j.LoggerFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
+
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
-import java.util.SortedSet;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.TreeSet;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -116,29 +112,11 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
*/
protected final int _acknowledgeMode;
- /**
- * Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode
- */
- private int _outstanding;
-
- /**
- * Switch to enable sending of acknowledgements when using DUPS_OK_ACKNOWLEDGE mode. Enabled when _outstannding
- * number of msgs >= _prefetchHigh and disabled at < _prefetchLow
- */
- private boolean _dups_ok_acknowledge_send;
-
- /**
- * List of tags delievered, The last of which which should be acknowledged on commit in transaction mode.
- */
- private ConcurrentLinkedQueue<Long> _receivedDeliveryTags = new ConcurrentLinkedQueue<Long>();
-
- /** The last tag that was "multiple" acknowledged on this session (if transacted) */
- private long _lastAcked;
-
- /** set of tags which have previously been acked; but not part of the multiple ack (transacted mode only) */
- private final SortedSet<Long> _previouslyAcked = new TreeSet<Long>();
+ private int _idMapSize = 100;//TODO: set by configuration
+ private int _maxDeliveryAttempts = 3; //TODO: set by configuration
+ private boolean _maxRedeliverEnabled = true;//TODO set based on above config
- private final Object _commitLock = new Object();
+ final DeliveryCountTracker _tracker = new DeliveryCountTracker(_idMapSize);//TODO
/**
* The thread that was used to call receive(). This is important for being able to interrupt that thread if a
@@ -757,19 +735,69 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
}
}
+ private void recordMessageID(AbstractJMSMessage message)
+ {
+ if(!isMaxDeliveryCountEnforced())
+ {
+ return;
+ }
+
+ String msgId = null;
+
+ try
+ {
+ msgId = message.getJMSMessageID();
+ if(msgId != null)
+ {
+ _tracker.recordMessage(msgId, message.getDeliveryTag());
+ }
+ }
+ catch (JMSException e)
+ {
+ _logger.warn("Exception while retrieving JMSMessageID from message" +
+ " with deliveryTag '" + message.getDeliveryTag() + "': " + e, e);
+ }
+ }
+
+ protected boolean shouldRequeueMessage(long deliveryTag)
+ {
+ if(!isMaxDeliveryCountEnforced())
+ {
+ return true;
+ }
+
+ int count = _tracker.getDeliveryCount(deliveryTag);
+
+ boolean reQueue = count < _maxDeliveryAttempts;
+
+ return reQueue;
+ }
+
void preDeliver(AbstractJMSMessage msg)
{
switch (_acknowledgeMode)
{
+ case Session.DUPS_OK_ACKNOWLEDGE:
+ //fall through
+ case Session.AUTO_ACKNOWLEDGE:
+ if(isMessageListenerSet())
+ {
+ //keep track of JMSMessageIDs handed to the client
+ recordMessageID(msg);
+ _session.setLastAsyncAutoAckDeliveryTag(msg.getDeliveryTag());
+ }
+ break;
case Session.PRE_ACKNOWLEDGE:
_session.acknowledgeMessage(msg.getDeliveryTag(), false);
break;
-
case Session.CLIENT_ACKNOWLEDGE:
// we set the session so that when the user calls acknowledge() it can call the method on session
// to send out the appropriate frame
msg.setAMQSession(_session);
+
+ //keep track of JMSMessageIDs handed to the client
+ recordMessageID(msg);
break;
case Session.SESSION_TRANSACTED:
if (isNoConsume())
@@ -780,18 +808,20 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
{
_session.addDeliveredMessage(msg.getDeliveryTag());
_session.markDirty();
+
+ //keep track of JMSMessageIDs handed to the client
+ recordMessageID(msg);
}
break;
}
}
-
+
void postDeliver(AbstractJMSMessage msg) throws JMSException
{
switch (_acknowledgeMode)
{
-
case Session.CLIENT_ACKNOWLEDGE:
if (isNoConsume())
{
@@ -799,101 +829,26 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
}
_session.markDirty();
break;
-
case Session.DUPS_OK_ACKNOWLEDGE:
+ //fall through
case Session.AUTO_ACKNOWLEDGE:
// we do not auto ack a message if the application code called recover()
if (!_session.isInRecovery())
{
_session.acknowledgeMessage(msg.getDeliveryTag(), false);
- }
- break;
- }
- }
-
-
- /**
- * Acknowledge up to last message delivered (if any). Used when commiting.
- *
- * @return the lastDeliveryTag to acknowledge
- */
- Long getLastDelivered()
- {
- if (!_receivedDeliveryTags.isEmpty())
- {
- Long lastDeliveryTag = _receivedDeliveryTags.poll();
-
- while (!_receivedDeliveryTags.isEmpty())
- {
- lastDeliveryTag = _receivedDeliveryTags.poll();
- }
-
- assert _receivedDeliveryTags.isEmpty();
-
- return lastDeliveryTag;
- }
-
- return null;
- }
-
- /**
- * Acknowledge up to last message delivered (if any). Used when commiting.
- */
- void acknowledgeDelivered()
- {
- synchronized(_commitLock)
- {
- ArrayList<Long> tagsToAck = new ArrayList<Long>();
-
- while (!_receivedDeliveryTags.isEmpty())
- {
- tagsToAck.add(_receivedDeliveryTags.poll());
- }
-
- Collections.sort(tagsToAck);
-
- long prevAcked = _lastAcked;
- long oldAckPoint = -1;
-
- while(oldAckPoint != prevAcked)
- {
- oldAckPoint = prevAcked;
-
- Iterator<Long> tagsToAckIterator = tagsToAck.iterator();
-
- while(tagsToAckIterator.hasNext() && tagsToAckIterator.next() == prevAcked+1)
- {
- tagsToAckIterator.remove();
- prevAcked++;
- }
+ if(isMessageListenerSet())
+ {
+ _session.clearLastAsyncAutoAckDeliveryTag();
- Iterator<Long> previousAckIterator = _previouslyAcked.iterator();
- while(previousAckIterator.hasNext() && previousAckIterator.next() == prevAcked+1)
- {
- previousAckIterator.remove();
- prevAcked++;
+ //explicitly remove records for message, we know they wont be used again
+ removeDeliveryCountRecordsForMessage(msg.getDeliveryTag());
+ }
}
-
- }
- if(prevAcked != _lastAcked)
- {
- _session.acknowledgeMessage(prevAcked, true);
- _lastAcked = prevAcked;
- }
-
- Iterator<Long> tagsToAckIterator = tagsToAck.iterator();
-
- while(tagsToAckIterator.hasNext())
- {
- Long tag = tagsToAckIterator.next();
- _session.acknowledgeMessage(tag, false);
- _previouslyAcked.add(tag);
- }
+ break;
}
}
-
void notifyError(Throwable cause)
{
// synchronized (_closed)
@@ -1019,7 +974,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
iterator.remove();
removed = true;
}
- }
+ }
if (removed && (initialSize == _synchronousQueue.size()))
{
@@ -1072,4 +1027,28 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
public void failedOverPost() {}
+ public boolean isMaxDeliveryCountEnforced()
+ {
+ return _maxRedeliverEnabled;
+ }
+
+ public int getMaxDeliveryAttempts()
+ {
+ return _maxDeliveryAttempts;
+ }
+ public void removeDeliveryCountRecordsForMessage(long deliveryTag)
+ {
+ if(isMaxDeliveryCountEnforced())
+ {
+ _tracker.removeRecordsForMessage(deliveryTag);
+ }
+ }
+
+ public void removeDeliveryCountRecordsForMessages(List<Long> deliveryTags)
+ {
+ if(isMaxDeliveryCountEnforced())
+ {
+ _tracker.removeRecordsForMessages(deliveryTags);
+ }
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index 1b2d6876bd..ffd3957b38 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -457,4 +457,11 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
clearReceiveQueue();
}
}
+
+ @Override
+ public boolean isMaxDeliveryCountEnforced()
+ {
+ //TODO Implement for 0-10 consumers.
+ return false;
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/DeliveryCountTracker.java b/qpid/java/client/src/main/java/org/apache/qpid/client/DeliveryCountTracker.java
new file mode 100644
index 0000000000..af01b26aec
--- /dev/null
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/DeliveryCountTracker.java
@@ -0,0 +1,196 @@
+package org.apache.qpid.client;
+
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.collections.BidiMap;
+import org.apache.commons.collections.bidimap.TreeBidiMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DeliveryCountTracker
+{
+ private static final Logger _logger = LoggerFactory.getLogger(DeliveryCountTracker.class);
+
+ /**
+ * Bidirectional Map of JMSMessageID with MessageTag.
+ */
+ private BidiMap _jmsIDtoDeliverTag = new TreeBidiMap();
+
+ /**
+ * Map of JMSMessageIDs with count of deliveries.
+ */
+ private Map<String,Integer> _receivedMsgIDs;
+
+ private int _capacity;
+
+ /**
+ * Creates a new DeliveryCountTracker instance.
+ *
+ * @param capacity the number of records to track.
+ * @throws IllegalArgumentException if specified capacity not > 0
+ */
+ public DeliveryCountTracker(int capacity) throws IllegalArgumentException
+ {
+ if(capacity <= 0)
+ {
+ throw new IllegalArgumentException("Specified capacity must be greater than 0.");
+ }
+ _capacity = capacity;
+
+ /*
+ * HashMap Javadoc states: "If the initial capacity is greater than the maximum number
+ * of entries divided by the load factor, no rehash operations will ever occur."
+ *
+ * Specifying an additional 5% size at construction with a 1.0 load factor to pre-allocate
+ * the entries, bound the max map size, and avoid size increases + associated rehashing.
+ */
+ int hashMapSize = (int)(_capacity * 1.05f);
+
+ /*
+ * Using the access-ordered LinkedHashMap variant to leverage the LRU based entry removal
+ * behaviour provided when then overriding the removeEldestEntry method.
+ */
+ _receivedMsgIDs = new LinkedHashMap<String,Integer>(hashMapSize, 1.0f, true)
+ {
+ //Control the max size of the map using LRU based removal upon insertion.
+ protected boolean removeEldestEntry(Map.Entry<String,Integer> eldest)
+ {
+ boolean remove = size() > _capacity;
+
+ // If the supplied entry is to be removed, also remove its associated
+ // delivery tag
+ if(remove)
+ {
+ String msgId = eldest.getKey();
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Removing delivery count records for message : " + msgId);
+ }
+
+ synchronized (DeliveryCountTracker.this)
+ {
+ //Also remove the message information from the deliveryTag map.
+ if(msgId != null)
+ {
+ _jmsIDtoDeliverTag.remove(msgId);
+ }
+ }
+ }
+
+ return remove;
+ }
+ };
+ }
+
+ /**
+ * Record sighting of a particular JMSMessageID, with the given deliveryTag.
+ *
+ * @param msgID the JMSMessageID of the message to track
+ * @param deliveryTag the delivery tag of the most recent encounter of the message
+ */
+ public synchronized void recordMessage(String msgID, long deliveryTag)
+ {
+ try
+ {
+ if(msgID == null)
+ {
+ //we can't distinguish between different
+ //messages without a JMSMessageID, so skip
+ return;
+ }
+
+ _jmsIDtoDeliverTag.put(msgID, deliveryTag);
+
+ Integer count = _receivedMsgIDs.get(msgID);
+
+ if(count != null)
+ {
+ ++count;
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Incrementing count for JMSMessageID: '" + msgID + "', value now: " + count);
+ }
+ _receivedMsgIDs.put(msgID, count);
+ }
+ else
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Recording first sighting of JMSMessageID '" + msgID + "'");
+ }
+ _receivedMsgIDs.put(msgID, 1);
+ }
+ }
+ catch(Exception e)
+ {
+ _logger.warn("Exception recording delivery count for message: " + msgID, e);
+ }
+ }
+
+ /**
+ * Returns the number of times the message related to the given delivery tag has been seen
+ *
+ * @param deliveryTag delivery tag of the message to retrieve the delivery count for
+ * @return the delivery count for that message, or 0 if there is no count known
+ */
+ public synchronized int getDeliveryCount(long deliveryTag)
+ {
+ String key = (String) _jmsIDtoDeliverTag.getKey(deliveryTag);
+
+ int count = 0;
+
+ if (key != null)
+ {
+ Integer val = _receivedMsgIDs.get(key);
+ if (val != null)
+ {
+ count = val;
+ }
+ }
+
+ return count;
+ }
+
+ /**
+ * Removes both JMSMessageID and count related records associated with the given deliveryTag if any such records exist.
+ * @param deliveryTag the current tag of the message for which the JMSMessageID and count records should be removed
+ */
+ public synchronized void removeRecordsForMessage(long deliveryTag)
+ {
+ String msgId = (String) _jmsIDtoDeliverTag.removeValue(deliveryTag);
+
+ if (msgId != null)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Removed deliveryTag mapping for ID: '" + msgId + "'");
+ }
+
+ Integer count = _receivedMsgIDs.remove(msgId);
+ if(count != null && _logger.isDebugEnabled())
+ {
+ _logger.debug("Removed count mapping for ID: '" + msgId + "' : " + count);
+ }
+ }
+ }
+
+ /**
+ * Removes both JMSMessageID and count related records associated with the given deliveryTags if any such records exist.
+ * @param deliveryTags the current tags of the messages for which the JMSMessageID and count records should be removed
+ */
+ public synchronized void removeRecordsForMessages(List<Long> deliveryTags)
+ {
+ if (deliveryTags == null)
+ {
+ return;
+ }
+
+ for(long tag : deliveryTags)
+ {
+ removeRecordsForMessage(tag);
+ }
+ }
+}
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/DeliveryCountTrackerTest.java b/qpid/java/client/src/test/java/org/apache/qpid/client/DeliveryCountTrackerTest.java
new file mode 100644
index 0000000000..2f3a186d82
--- /dev/null
+++ b/qpid/java/client/src/test/java/org/apache/qpid/client/DeliveryCountTrackerTest.java
@@ -0,0 +1,158 @@
+package org.apache.qpid.client;
+
+import java.util.ArrayList;
+
+import junit.framework.TestCase;
+
+public class DeliveryCountTrackerTest extends TestCase
+{
+ private DeliveryCountTracker _tracker;
+ private final int CAPACITY=50;
+
+ protected void setUp()
+ {
+ _tracker = new DeliveryCountTracker(CAPACITY);
+ }
+
+ /**
+ * Test the LRU based eviction policy of the tracker. Both the process of tracking new sightings of a given
+ * JMSMessageID and retrieving the existing count will involve accessing an existing record and making it
+ * the most recently accessed. Commit/Rollback/Recover should remove any messages that can't be seen again
+ * due to consumption or rejection. Any other messages must be evicted by LRU policy as and when necessary
+ * to make way for new entries.
+ *
+ * Test this by validating that upon tracking one more message than the capacity, the first message count
+ * is lost. Then access the second message count and insert a further new message occurrence. Verify the
+ * third message count is removed and not the second message count.
+ */
+ public void testLRUeviction()
+ {
+ long id;
+
+ for(id=1; id <= CAPACITY + 1; id ++)
+ {
+ _tracker.recordMessage(String.valueOf(id), id);
+ }
+
+ assertEquals("Count was not as expected. First delivery tag " +
+ "should have been evicted already:", _tracker.getDeliveryCount(1L), 0L);
+
+ //Retrieve second delivery tag count, ensure it is not zero.
+ //This will also make it the most recently accessed record.
+ assertEquals("Count was not as expected.", _tracker.getDeliveryCount(2L), 1L);
+
+ //Add a new record, check that tag 2 remains and tag 3 was evicted.
+ _tracker.recordMessage(String.valueOf(id), id);
+ assertEquals("Count was not as expected. Second delivery tag " +
+ "should NOT have been evicted already:", _tracker.getDeliveryCount(2L), 1L);
+ assertEquals("Count was not as expected. Third delivery tag " +
+ "should have been evicted already:", _tracker.getDeliveryCount(3L), 0L);
+ }
+
+ /**
+ * Test that once it is known a record can never be useful again it can be successfully removed
+ * from the records to allow room for new records without causing eviction of information that
+ * could still be useful.
+ *
+ * Fill the tracker with records, ensure the counts are correct and then delete them, and ensure the
+ * counts get reset.
+ */
+ public void testMessageRecordRemoval()
+ {
+ long id;
+
+ for(id=1 ; id <= CAPACITY; id ++)
+ {
+ _tracker.recordMessage(String.valueOf(id), id);
+ }
+
+ assertEquals("Count was not as expected.", _tracker.getDeliveryCount(1L), 1L);
+ assertEquals("Count was not as expected.", _tracker.getDeliveryCount(CAPACITY/2), 1L);
+ assertEquals("Count was not as expected.", _tracker.getDeliveryCount(CAPACITY-1), 1L);
+
+ //remove records for first deliveryTag, ensure the others remain as expected
+ _tracker.removeRecordsForMessage(1L);
+ assertEquals("Count was not as expected.", _tracker.getDeliveryCount(1L), 0L);
+ assertEquals("Count was not as expected.", _tracker.getDeliveryCount(CAPACITY/2), 1L);
+ assertEquals("Count was not as expected.", _tracker.getDeliveryCount(CAPACITY-1), 1L);
+
+ //remove records for next deliveryTag, ensure the others remain as expected
+ _tracker.removeRecordsForMessage(CAPACITY/2);
+ assertEquals("Count was not as expected.", _tracker.getDeliveryCount(1L), 0L);
+ assertEquals("Count was not as expected.", _tracker.getDeliveryCount(CAPACITY/2), 0L);
+ assertEquals("Count was not as expected.", _tracker.getDeliveryCount(CAPACITY-1), 1L);
+
+ //remove records for next deliveryTag, ensure the others remain as expected
+ _tracker.removeRecordsForMessage(CAPACITY-1);
+ assertEquals("Count was not as expected.", _tracker.getDeliveryCount(1L), 0L);
+ assertEquals("Count was not as expected.", _tracker.getDeliveryCount(CAPACITY/2), 0L);
+ assertEquals("Count was not as expected.", _tracker.getDeliveryCount(CAPACITY-1), 0L);
+
+ //ensure records for last deliveryTag is still as expected
+ assertEquals("Count was not as expected.", _tracker.getDeliveryCount(CAPACITY), 1L);
+ }
+
+ /**
+ * Test that counts are accurately incremented and associated with the new deliveryTag when
+ * a message with the same JMSMessageID is encountered again, also ensuring that record of
+ * the count is no longer returned via the old deliveryTag
+ */
+ public void testCounting()
+ {
+ long id;
+
+ for(id=1 ; id <= CAPACITY; id ++)
+ {
+ _tracker.recordMessage(String.valueOf(id), id);
+ }
+
+ //verify all counts are currently 1
+ ArrayList<Long> exclusions = new ArrayList<Long>();
+ verifyCounts(1,exclusions, CAPACITY);
+
+ //Gather some of the existing JMSMessageIDs and create new deliveryTags for them, which can
+ //be used to represent receiving the same message again.
+ String msgId1 = String.valueOf(1L);
+ long newTag1 = id;
+ String msgId2 = String.valueOf(CAPACITY/2);
+ long newTag2 = ++id;
+ String msgId3 = String.valueOf(CAPACITY);
+ long newTag3 = ++id;
+ _tracker.recordMessage(String.valueOf(msgId1), newTag1);
+ _tracker.recordMessage(String.valueOf(msgId2), newTag2);
+ _tracker.recordMessage(String.valueOf(msgId3), newTag3);
+
+ //Now check the updated values returned are as expected.
+
+ //entries for delivery tags with value 1,CAPACITY/2,CAPACITY should have just been removed
+ //because new delivery tags associated with the same JMSMessageID were just recorded.
+ assertEquals("Count was not as expected.", 0L, _tracker.getDeliveryCount(1));
+ assertEquals("Count was not as expected.", 0L, _tracker.getDeliveryCount(CAPACITY/2));
+ assertEquals("Count was not as expected.", 0L, _tracker.getDeliveryCount(CAPACITY));
+
+ //The count for the 'redelivered' messages with new deliveryTag should have increased.
+ assertEquals("Count was not as expected.", 2L, _tracker.getDeliveryCount(newTag1));
+ assertEquals("Count was not as expected.", 2L, _tracker.getDeliveryCount(newTag2));
+ assertEquals("Count was not as expected.", 2L, _tracker.getDeliveryCount(newTag3));
+
+ //all the other delivery tags should remain at 1:
+ exclusions.add(1L);
+ exclusions.add((long) CAPACITY/2);
+ exclusions.add((long) CAPACITY);
+ exclusions.add(newTag1);
+ exclusions.add(newTag2);
+ exclusions.add(newTag3);
+ verifyCounts(1,exclusions, CAPACITY+3);
+ }
+
+ private void verifyCounts(long expectedValue, ArrayList<Long> excludeFromCheck, long numValues)
+ {
+ for(long id=1 ; id <= numValues; id ++)
+ {
+ if (!excludeFromCheck.contains(id))
+ {
+ assertEquals("Count was not as expected for id '" + id + "'.", 1L, _tracker.getDeliveryCount(id));
+ }
+ }
+ }
+}