diff options
author | Robert Gemmell <robbie@apache.org> | 2010-12-07 12:22:54 +0000 |
---|---|---|
committer | Robert Gemmell <robbie@apache.org> | 2010-12-07 12:22:54 +0000 |
commit | 2eaa40c0c778d0bedb7225d850da00ac66c04537 (patch) | |
tree | 8249d2cfa036ffddce337e8ef5471a358017b5dc | |
parent | ffcc6f37389171664f9a6d2479462e3591589080 (diff) | |
download | qpid-python-2eaa40c0c778d0bedb7225d850da00ac66c04537.tar.gz |
QPID-2971 - onMessage/recieve + recover/rollback handling of Max Delivery Count for 0-8/0-9 consumers
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.5.x-dev@1042997 13f79535-47bb-0310-9956-ffa450edef68
6 files changed, 584 insertions, 126 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 9f5ab79ddf..3a542765ca 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -314,6 +314,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic /** All the delivered message tags */ protected ConcurrentLinkedQueue<Long> _deliveredMessageTags = new ConcurrentLinkedQueue<Long>(); + /** The last asynchronously delivered auto-ack message tag */ + protected Long _lastAsyncAutoAckDeliveryTag = null; + /** Holds the dispatcher thread for this session. */ protected Dispatcher _dispatcher; @@ -821,7 +824,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic "Forced rollback"); } - + ArrayList<Long> ackedMessageTags = new ArrayList<Long>(); + // Acknowledge all delivered messages while (true) { @@ -832,10 +836,17 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } acknowledgeMessage(tag, false); + ackedMessageTags.add(tag); } // Commits outstanding messages and acknowledgments sendCommit(); markClean(); + + //remove MaxRedelivery info for the commited deliveryTags to enhance retention of other message tags + for(C consumer : _consumers.values()) + { + consumer.removeDeliveryCountRecordsForMessages(ackedMessageTags); + } } catch (AMQException e) { @@ -1631,6 +1642,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic _dispatcher.rollback(); } + enforceMaxDeliveryCountDuringRecover(); + sendRecover(); markClean(); @@ -1650,6 +1663,76 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } } + private void enforceMaxDeliveryCountDuringRecover() + { + ArrayList<C> consumersToCheck = new ArrayList<C>(_consumers.values()); + Iterator<C> iter = consumersToCheck.iterator(); + if(!iter.hasNext()) + { + return; + } + + //remove any consumers not enforcing MaxDelivery + while(iter.hasNext()) + { + C con = iter.next(); + if(!con.isMaxDeliveryCountEnforced()) + { + iter.remove(); + } + } + + if (consumersToCheck.size() > 0) + { + //reject(false) any messages we don't want returned again + switch(_acknowledgeMode) + { + case Session.CLIENT_ACKNOWLEDGE: + for(long tag : _unacknowledgedMessageTags) + { + for(C consumer : consumersToCheck) + { + if(!consumer.shouldRequeueMessage(tag)) + { + //consumer said we should not requeue the message, do reject(false) + rejectMessage(tag, false); + + //explicitly remove records for message, we know they wont be used again + consumer.removeDeliveryCountRecordsForMessage(tag); + //no need to check other consumers now + break; + } + } + } + break; + case Session.DUPS_OK_ACKNOWLEDGE: + //fall through + case Session.AUTO_ACKNOWLEDGE: + //check the last message asynchronously delivered via auto-ack + Long tag = getLastAsyncAutoAckDeliveryTag(); + clearLastAsyncAutoAckDeliveryTag(); + + if(tag != null) + { + for(C consumer : consumersToCheck) + { + if(consumer.isMessageListenerSet() && !consumer.shouldRequeueMessage(tag)) + { + //consumer said we should not requeue the message, do reject(false) + rejectMessage(tag, false); + + //explicitly remove records for message, we know they wont be used again + consumer.removeDeliveryCountRecordsForMessage(tag); + //no need to check other consumers now + break; + } + } + } + break; + } + } + } + protected abstract void sendRecover() throws AMQException, FailoverException; public void rejectMessage(UnprocessedMessage message, boolean requeue) @@ -2046,7 +2129,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic void syncDispatchQueue() { - if (Thread.currentThread() == _dispatcherThread) + if (isDispatcherThread()) { while (!_closed.get() && !_queue.isEmpty()) { @@ -2154,7 +2237,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic void startDispatcherIfNecessary() { //If we are the dispatcher then we don't need to check we are started - if (Thread.currentThread() == _dispatcherThread) + if (isDispatcherThread()) { return; } @@ -2762,11 +2845,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic _producers.put(new Long(producerId), producer); } - private void rejectAllMessages(boolean requeue) - { - rejectMessagesForConsumerTag(0, requeue, true); - } - /** * @param consumerTag The consumerTag to prune from queue or all if null * @param requeue Should the removed messages be requeued (or discarded. Possibly to DLQ) @@ -3307,4 +3385,24 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { return _closing.get()|| _connection.isClosing(); } + + public void setLastAsyncAutoAckDeliveryTag(Long tag) + { + _lastAsyncAutoAckDeliveryTag = tag; + } + + public Long getLastAsyncAutoAckDeliveryTag() + { + return _lastAsyncAutoAckDeliveryTag; + } + + public void clearLastAsyncAutoAckDeliveryTag() + { + _lastAsyncAutoAckDeliveryTag = null; + } + + protected boolean isDispatcherThread() + { + return Thread.currentThread() == _dispatcherThread; + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index fa048c9997..c7a020c970 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.client; - import java.util.Map; import javax.jms.Destination; @@ -241,17 +240,38 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B break; } - rejectMessage(tag, true); + boolean shouldRequeue = true; + BasicMessageConsumer_0_8 rejectingConsumer = null; + + for(BasicMessageConsumer_0_8 consumer : _consumers.values()) + { + shouldRequeue = consumer.shouldRequeueMessage(tag); + if(!shouldRequeue) + { + rejectingConsumer = consumer; + + //no need to consult other consumers now, it is rejected. + break; + } + } + + rejectMessage(tag, shouldRequeue); + if(!shouldRequeue) + { + //explicitly remove records for message, we know they wont be used again + rejectingConsumer.removeDeliveryCountRecordsForMessage(tag); + } } } public void rejectMessage(long deliveryTag, boolean requeue) { - if ((_acknowledgeMode == CLIENT_ACKNOWLEDGE) || (_acknowledgeMode == SESSION_TRANSACTED)) + if ((_acknowledgeMode == CLIENT_ACKNOWLEDGE) || (_acknowledgeMode == SESSION_TRANSACTED) || + ((_acknowledgeMode == AUTO_ACKNOWLEDGE || _acknowledgeMode == DUPS_OK_ACKNOWLEDGE ) && hasMessageListeners())) { if (_logger.isDebugEnabled()) { - _logger.debug("Rejecting delivery tag:" + deliveryTag + ":SessionHC:" + this.hashCode()); + _logger.debug("Rejecting delivery tag:" + deliveryTag + ":ReQueue:" + requeue + ":SessionHC:" + this.hashCode()); } BasicRejectBody body = getMethodRegistry().createBasicRejectBody(deliveryTag, requeue); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index bea43cc232..8d43bef4ec 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -33,15 +33,11 @@ import org.slf4j.LoggerFactory; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; + import java.util.Arrays; import java.util.Iterator; import java.util.List; -import java.util.SortedSet; -import java.util.ArrayList; -import java.util.Collections; -import java.util.TreeSet; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -116,29 +112,11 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa */ protected final int _acknowledgeMode; - /** - * Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode - */ - private int _outstanding; - - /** - * Switch to enable sending of acknowledgements when using DUPS_OK_ACKNOWLEDGE mode. Enabled when _outstannding - * number of msgs >= _prefetchHigh and disabled at < _prefetchLow - */ - private boolean _dups_ok_acknowledge_send; - - /** - * List of tags delievered, The last of which which should be acknowledged on commit in transaction mode. - */ - private ConcurrentLinkedQueue<Long> _receivedDeliveryTags = new ConcurrentLinkedQueue<Long>(); - - /** The last tag that was "multiple" acknowledged on this session (if transacted) */ - private long _lastAcked; - - /** set of tags which have previously been acked; but not part of the multiple ack (transacted mode only) */ - private final SortedSet<Long> _previouslyAcked = new TreeSet<Long>(); + private int _idMapSize = 100;//TODO: set by configuration + private int _maxDeliveryAttempts = 3; //TODO: set by configuration + private boolean _maxRedeliverEnabled = true;//TODO set based on above config - private final Object _commitLock = new Object(); + final DeliveryCountTracker _tracker = new DeliveryCountTracker(_idMapSize);//TODO /** * The thread that was used to call receive(). This is important for being able to interrupt that thread if a @@ -757,19 +735,69 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa } } + private void recordMessageID(AbstractJMSMessage message) + { + if(!isMaxDeliveryCountEnforced()) + { + return; + } + + String msgId = null; + + try + { + msgId = message.getJMSMessageID(); + if(msgId != null) + { + _tracker.recordMessage(msgId, message.getDeliveryTag()); + } + } + catch (JMSException e) + { + _logger.warn("Exception while retrieving JMSMessageID from message" + + " with deliveryTag '" + message.getDeliveryTag() + "': " + e, e); + } + } + + protected boolean shouldRequeueMessage(long deliveryTag) + { + if(!isMaxDeliveryCountEnforced()) + { + return true; + } + + int count = _tracker.getDeliveryCount(deliveryTag); + + boolean reQueue = count < _maxDeliveryAttempts; + + return reQueue; + } + void preDeliver(AbstractJMSMessage msg) { switch (_acknowledgeMode) { + case Session.DUPS_OK_ACKNOWLEDGE: + //fall through + case Session.AUTO_ACKNOWLEDGE: + if(isMessageListenerSet()) + { + //keep track of JMSMessageIDs handed to the client + recordMessageID(msg); + _session.setLastAsyncAutoAckDeliveryTag(msg.getDeliveryTag()); + } + break; case Session.PRE_ACKNOWLEDGE: _session.acknowledgeMessage(msg.getDeliveryTag(), false); break; - case Session.CLIENT_ACKNOWLEDGE: // we set the session so that when the user calls acknowledge() it can call the method on session // to send out the appropriate frame msg.setAMQSession(_session); + + //keep track of JMSMessageIDs handed to the client + recordMessageID(msg); break; case Session.SESSION_TRANSACTED: if (isNoConsume()) @@ -780,18 +808,20 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa { _session.addDeliveredMessage(msg.getDeliveryTag()); _session.markDirty(); + + //keep track of JMSMessageIDs handed to the client + recordMessageID(msg); } break; } } - + void postDeliver(AbstractJMSMessage msg) throws JMSException { switch (_acknowledgeMode) { - case Session.CLIENT_ACKNOWLEDGE: if (isNoConsume()) { @@ -799,101 +829,26 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa } _session.markDirty(); break; - case Session.DUPS_OK_ACKNOWLEDGE: + //fall through case Session.AUTO_ACKNOWLEDGE: // we do not auto ack a message if the application code called recover() if (!_session.isInRecovery()) { _session.acknowledgeMessage(msg.getDeliveryTag(), false); - } - break; - } - } - - - /** - * Acknowledge up to last message delivered (if any). Used when commiting. - * - * @return the lastDeliveryTag to acknowledge - */ - Long getLastDelivered() - { - if (!_receivedDeliveryTags.isEmpty()) - { - Long lastDeliveryTag = _receivedDeliveryTags.poll(); - - while (!_receivedDeliveryTags.isEmpty()) - { - lastDeliveryTag = _receivedDeliveryTags.poll(); - } - - assert _receivedDeliveryTags.isEmpty(); - - return lastDeliveryTag; - } - - return null; - } - - /** - * Acknowledge up to last message delivered (if any). Used when commiting. - */ - void acknowledgeDelivered() - { - synchronized(_commitLock) - { - ArrayList<Long> tagsToAck = new ArrayList<Long>(); - - while (!_receivedDeliveryTags.isEmpty()) - { - tagsToAck.add(_receivedDeliveryTags.poll()); - } - - Collections.sort(tagsToAck); - - long prevAcked = _lastAcked; - long oldAckPoint = -1; - - while(oldAckPoint != prevAcked) - { - oldAckPoint = prevAcked; - - Iterator<Long> tagsToAckIterator = tagsToAck.iterator(); - - while(tagsToAckIterator.hasNext() && tagsToAckIterator.next() == prevAcked+1) - { - tagsToAckIterator.remove(); - prevAcked++; - } + if(isMessageListenerSet()) + { + _session.clearLastAsyncAutoAckDeliveryTag(); - Iterator<Long> previousAckIterator = _previouslyAcked.iterator(); - while(previousAckIterator.hasNext() && previousAckIterator.next() == prevAcked+1) - { - previousAckIterator.remove(); - prevAcked++; + //explicitly remove records for message, we know they wont be used again + removeDeliveryCountRecordsForMessage(msg.getDeliveryTag()); + } } - - } - if(prevAcked != _lastAcked) - { - _session.acknowledgeMessage(prevAcked, true); - _lastAcked = prevAcked; - } - - Iterator<Long> tagsToAckIterator = tagsToAck.iterator(); - - while(tagsToAckIterator.hasNext()) - { - Long tag = tagsToAckIterator.next(); - _session.acknowledgeMessage(tag, false); - _previouslyAcked.add(tag); - } + break; } } - void notifyError(Throwable cause) { // synchronized (_closed) @@ -1019,7 +974,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa iterator.remove(); removed = true; } - } + } if (removed && (initialSize == _synchronousQueue.size())) { @@ -1072,4 +1027,28 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa public void failedOverPost() {} + public boolean isMaxDeliveryCountEnforced() + { + return _maxRedeliverEnabled; + } + + public int getMaxDeliveryAttempts() + { + return _maxDeliveryAttempts; + } + public void removeDeliveryCountRecordsForMessage(long deliveryTag) + { + if(isMaxDeliveryCountEnforced()) + { + _tracker.removeRecordsForMessage(deliveryTag); + } + } + + public void removeDeliveryCountRecordsForMessages(List<Long> deliveryTags) + { + if(isMaxDeliveryCountEnforced()) + { + _tracker.removeRecordsForMessages(deliveryTags); + } + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 1b2d6876bd..ffd3957b38 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -457,4 +457,11 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM clearReceiveQueue(); } } + + @Override + public boolean isMaxDeliveryCountEnforced() + { + //TODO Implement for 0-10 consumers. + return false; + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/DeliveryCountTracker.java b/qpid/java/client/src/main/java/org/apache/qpid/client/DeliveryCountTracker.java new file mode 100644 index 0000000000..af01b26aec --- /dev/null +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/DeliveryCountTracker.java @@ -0,0 +1,196 @@ +package org.apache.qpid.client; + +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.collections.BidiMap; +import org.apache.commons.collections.bidimap.TreeBidiMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DeliveryCountTracker +{ + private static final Logger _logger = LoggerFactory.getLogger(DeliveryCountTracker.class); + + /** + * Bidirectional Map of JMSMessageID with MessageTag. + */ + private BidiMap _jmsIDtoDeliverTag = new TreeBidiMap(); + + /** + * Map of JMSMessageIDs with count of deliveries. + */ + private Map<String,Integer> _receivedMsgIDs; + + private int _capacity; + + /** + * Creates a new DeliveryCountTracker instance. + * + * @param capacity the number of records to track. + * @throws IllegalArgumentException if specified capacity not > 0 + */ + public DeliveryCountTracker(int capacity) throws IllegalArgumentException + { + if(capacity <= 0) + { + throw new IllegalArgumentException("Specified capacity must be greater than 0."); + } + _capacity = capacity; + + /* + * HashMap Javadoc states: "If the initial capacity is greater than the maximum number + * of entries divided by the load factor, no rehash operations will ever occur." + * + * Specifying an additional 5% size at construction with a 1.0 load factor to pre-allocate + * the entries, bound the max map size, and avoid size increases + associated rehashing. + */ + int hashMapSize = (int)(_capacity * 1.05f); + + /* + * Using the access-ordered LinkedHashMap variant to leverage the LRU based entry removal + * behaviour provided when then overriding the removeEldestEntry method. + */ + _receivedMsgIDs = new LinkedHashMap<String,Integer>(hashMapSize, 1.0f, true) + { + //Control the max size of the map using LRU based removal upon insertion. + protected boolean removeEldestEntry(Map.Entry<String,Integer> eldest) + { + boolean remove = size() > _capacity; + + // If the supplied entry is to be removed, also remove its associated + // delivery tag + if(remove) + { + String msgId = eldest.getKey(); + + if (_logger.isDebugEnabled()) + { + _logger.debug("Removing delivery count records for message : " + msgId); + } + + synchronized (DeliveryCountTracker.this) + { + //Also remove the message information from the deliveryTag map. + if(msgId != null) + { + _jmsIDtoDeliverTag.remove(msgId); + } + } + } + + return remove; + } + }; + } + + /** + * Record sighting of a particular JMSMessageID, with the given deliveryTag. + * + * @param msgID the JMSMessageID of the message to track + * @param deliveryTag the delivery tag of the most recent encounter of the message + */ + public synchronized void recordMessage(String msgID, long deliveryTag) + { + try + { + if(msgID == null) + { + //we can't distinguish between different + //messages without a JMSMessageID, so skip + return; + } + + _jmsIDtoDeliverTag.put(msgID, deliveryTag); + + Integer count = _receivedMsgIDs.get(msgID); + + if(count != null) + { + ++count; + if (_logger.isDebugEnabled()) + { + _logger.debug("Incrementing count for JMSMessageID: '" + msgID + "', value now: " + count); + } + _receivedMsgIDs.put(msgID, count); + } + else + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Recording first sighting of JMSMessageID '" + msgID + "'"); + } + _receivedMsgIDs.put(msgID, 1); + } + } + catch(Exception e) + { + _logger.warn("Exception recording delivery count for message: " + msgID, e); + } + } + + /** + * Returns the number of times the message related to the given delivery tag has been seen + * + * @param deliveryTag delivery tag of the message to retrieve the delivery count for + * @return the delivery count for that message, or 0 if there is no count known + */ + public synchronized int getDeliveryCount(long deliveryTag) + { + String key = (String) _jmsIDtoDeliverTag.getKey(deliveryTag); + + int count = 0; + + if (key != null) + { + Integer val = _receivedMsgIDs.get(key); + if (val != null) + { + count = val; + } + } + + return count; + } + + /** + * Removes both JMSMessageID and count related records associated with the given deliveryTag if any such records exist. + * @param deliveryTag the current tag of the message for which the JMSMessageID and count records should be removed + */ + public synchronized void removeRecordsForMessage(long deliveryTag) + { + String msgId = (String) _jmsIDtoDeliverTag.removeValue(deliveryTag); + + if (msgId != null) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Removed deliveryTag mapping for ID: '" + msgId + "'"); + } + + Integer count = _receivedMsgIDs.remove(msgId); + if(count != null && _logger.isDebugEnabled()) + { + _logger.debug("Removed count mapping for ID: '" + msgId + "' : " + count); + } + } + } + + /** + * Removes both JMSMessageID and count related records associated with the given deliveryTags if any such records exist. + * @param deliveryTags the current tags of the messages for which the JMSMessageID and count records should be removed + */ + public synchronized void removeRecordsForMessages(List<Long> deliveryTags) + { + if (deliveryTags == null) + { + return; + } + + for(long tag : deliveryTags) + { + removeRecordsForMessage(tag); + } + } +} diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/DeliveryCountTrackerTest.java b/qpid/java/client/src/test/java/org/apache/qpid/client/DeliveryCountTrackerTest.java new file mode 100644 index 0000000000..2f3a186d82 --- /dev/null +++ b/qpid/java/client/src/test/java/org/apache/qpid/client/DeliveryCountTrackerTest.java @@ -0,0 +1,158 @@ +package org.apache.qpid.client; + +import java.util.ArrayList; + +import junit.framework.TestCase; + +public class DeliveryCountTrackerTest extends TestCase +{ + private DeliveryCountTracker _tracker; + private final int CAPACITY=50; + + protected void setUp() + { + _tracker = new DeliveryCountTracker(CAPACITY); + } + + /** + * Test the LRU based eviction policy of the tracker. Both the process of tracking new sightings of a given + * JMSMessageID and retrieving the existing count will involve accessing an existing record and making it + * the most recently accessed. Commit/Rollback/Recover should remove any messages that can't be seen again + * due to consumption or rejection. Any other messages must be evicted by LRU policy as and when necessary + * to make way for new entries. + * + * Test this by validating that upon tracking one more message than the capacity, the first message count + * is lost. Then access the second message count and insert a further new message occurrence. Verify the + * third message count is removed and not the second message count. + */ + public void testLRUeviction() + { + long id; + + for(id=1; id <= CAPACITY + 1; id ++) + { + _tracker.recordMessage(String.valueOf(id), id); + } + + assertEquals("Count was not as expected. First delivery tag " + + "should have been evicted already:", _tracker.getDeliveryCount(1L), 0L); + + //Retrieve second delivery tag count, ensure it is not zero. + //This will also make it the most recently accessed record. + assertEquals("Count was not as expected.", _tracker.getDeliveryCount(2L), 1L); + + //Add a new record, check that tag 2 remains and tag 3 was evicted. + _tracker.recordMessage(String.valueOf(id), id); + assertEquals("Count was not as expected. Second delivery tag " + + "should NOT have been evicted already:", _tracker.getDeliveryCount(2L), 1L); + assertEquals("Count was not as expected. Third delivery tag " + + "should have been evicted already:", _tracker.getDeliveryCount(3L), 0L); + } + + /** + * Test that once it is known a record can never be useful again it can be successfully removed + * from the records to allow room for new records without causing eviction of information that + * could still be useful. + * + * Fill the tracker with records, ensure the counts are correct and then delete them, and ensure the + * counts get reset. + */ + public void testMessageRecordRemoval() + { + long id; + + for(id=1 ; id <= CAPACITY; id ++) + { + _tracker.recordMessage(String.valueOf(id), id); + } + + assertEquals("Count was not as expected.", _tracker.getDeliveryCount(1L), 1L); + assertEquals("Count was not as expected.", _tracker.getDeliveryCount(CAPACITY/2), 1L); + assertEquals("Count was not as expected.", _tracker.getDeliveryCount(CAPACITY-1), 1L); + + //remove records for first deliveryTag, ensure the others remain as expected + _tracker.removeRecordsForMessage(1L); + assertEquals("Count was not as expected.", _tracker.getDeliveryCount(1L), 0L); + assertEquals("Count was not as expected.", _tracker.getDeliveryCount(CAPACITY/2), 1L); + assertEquals("Count was not as expected.", _tracker.getDeliveryCount(CAPACITY-1), 1L); + + //remove records for next deliveryTag, ensure the others remain as expected + _tracker.removeRecordsForMessage(CAPACITY/2); + assertEquals("Count was not as expected.", _tracker.getDeliveryCount(1L), 0L); + assertEquals("Count was not as expected.", _tracker.getDeliveryCount(CAPACITY/2), 0L); + assertEquals("Count was not as expected.", _tracker.getDeliveryCount(CAPACITY-1), 1L); + + //remove records for next deliveryTag, ensure the others remain as expected + _tracker.removeRecordsForMessage(CAPACITY-1); + assertEquals("Count was not as expected.", _tracker.getDeliveryCount(1L), 0L); + assertEquals("Count was not as expected.", _tracker.getDeliveryCount(CAPACITY/2), 0L); + assertEquals("Count was not as expected.", _tracker.getDeliveryCount(CAPACITY-1), 0L); + + //ensure records for last deliveryTag is still as expected + assertEquals("Count was not as expected.", _tracker.getDeliveryCount(CAPACITY), 1L); + } + + /** + * Test that counts are accurately incremented and associated with the new deliveryTag when + * a message with the same JMSMessageID is encountered again, also ensuring that record of + * the count is no longer returned via the old deliveryTag + */ + public void testCounting() + { + long id; + + for(id=1 ; id <= CAPACITY; id ++) + { + _tracker.recordMessage(String.valueOf(id), id); + } + + //verify all counts are currently 1 + ArrayList<Long> exclusions = new ArrayList<Long>(); + verifyCounts(1,exclusions, CAPACITY); + + //Gather some of the existing JMSMessageIDs and create new deliveryTags for them, which can + //be used to represent receiving the same message again. + String msgId1 = String.valueOf(1L); + long newTag1 = id; + String msgId2 = String.valueOf(CAPACITY/2); + long newTag2 = ++id; + String msgId3 = String.valueOf(CAPACITY); + long newTag3 = ++id; + _tracker.recordMessage(String.valueOf(msgId1), newTag1); + _tracker.recordMessage(String.valueOf(msgId2), newTag2); + _tracker.recordMessage(String.valueOf(msgId3), newTag3); + + //Now check the updated values returned are as expected. + + //entries for delivery tags with value 1,CAPACITY/2,CAPACITY should have just been removed + //because new delivery tags associated with the same JMSMessageID were just recorded. + assertEquals("Count was not as expected.", 0L, _tracker.getDeliveryCount(1)); + assertEquals("Count was not as expected.", 0L, _tracker.getDeliveryCount(CAPACITY/2)); + assertEquals("Count was not as expected.", 0L, _tracker.getDeliveryCount(CAPACITY)); + + //The count for the 'redelivered' messages with new deliveryTag should have increased. + assertEquals("Count was not as expected.", 2L, _tracker.getDeliveryCount(newTag1)); + assertEquals("Count was not as expected.", 2L, _tracker.getDeliveryCount(newTag2)); + assertEquals("Count was not as expected.", 2L, _tracker.getDeliveryCount(newTag3)); + + //all the other delivery tags should remain at 1: + exclusions.add(1L); + exclusions.add((long) CAPACITY/2); + exclusions.add((long) CAPACITY); + exclusions.add(newTag1); + exclusions.add(newTag2); + exclusions.add(newTag3); + verifyCounts(1,exclusions, CAPACITY+3); + } + + private void verifyCounts(long expectedValue, ArrayList<Long> excludeFromCheck, long numValues) + { + for(long id=1 ; id <= numValues; id ++) + { + if (!excludeFromCheck.contains(id)) + { + assertEquals("Count was not as expected for id '" + id + "'.", 1L, _tracker.getDeliveryCount(id)); + } + } + } +} |