From 2eaa40c0c778d0bedb7225d850da00ac66c04537 Mon Sep 17 00:00:00 2001 From: Robert Gemmell Date: Tue, 7 Dec 2010 12:22:54 +0000 Subject: QPID-2971 - onMessage/recieve + recover/rollback handling of Max Delivery Count for 0-8/0-9 consumers git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.5.x-dev@1042997 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/client/AMQSession.java | 114 +++++++++++- .../org/apache/qpid/client/AMQSession_0_8.java | 28 ++- .../apache/qpid/client/BasicMessageConsumer.java | 207 +++++++++------------ .../qpid/client/BasicMessageConsumer_0_10.java | 7 + .../apache/qpid/client/DeliveryCountTracker.java | 196 +++++++++++++++++++ .../qpid/client/DeliveryCountTrackerTest.java | 158 ++++++++++++++++ 6 files changed, 584 insertions(+), 126 deletions(-) create mode 100644 qpid/java/client/src/main/java/org/apache/qpid/client/DeliveryCountTracker.java create mode 100644 qpid/java/client/src/test/java/org/apache/qpid/client/DeliveryCountTrackerTest.java diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 9f5ab79ddf..3a542765ca 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -314,6 +314,9 @@ public abstract class AMQSession _deliveredMessageTags = new ConcurrentLinkedQueue(); + /** The last asynchronously delivered auto-ack message tag */ + protected Long _lastAsyncAutoAckDeliveryTag = null; + /** Holds the dispatcher thread for this session. */ protected Dispatcher _dispatcher; @@ -821,7 +824,8 @@ public abstract class AMQSession ackedMessageTags = new ArrayList(); + // Acknowledge all delivered messages while (true) { @@ -832,10 +836,17 @@ public abstract class AMQSession consumersToCheck = new ArrayList(_consumers.values()); + Iterator iter = consumersToCheck.iterator(); + if(!iter.hasNext()) + { + return; + } + + //remove any consumers not enforcing MaxDelivery + while(iter.hasNext()) + { + C con = iter.next(); + if(!con.isMaxDeliveryCountEnforced()) + { + iter.remove(); + } + } + + if (consumersToCheck.size() > 0) + { + //reject(false) any messages we don't want returned again + switch(_acknowledgeMode) + { + case Session.CLIENT_ACKNOWLEDGE: + for(long tag : _unacknowledgedMessageTags) + { + for(C consumer : consumersToCheck) + { + if(!consumer.shouldRequeueMessage(tag)) + { + //consumer said we should not requeue the message, do reject(false) + rejectMessage(tag, false); + + //explicitly remove records for message, we know they wont be used again + consumer.removeDeliveryCountRecordsForMessage(tag); + //no need to check other consumers now + break; + } + } + } + break; + case Session.DUPS_OK_ACKNOWLEDGE: + //fall through + case Session.AUTO_ACKNOWLEDGE: + //check the last message asynchronously delivered via auto-ack + Long tag = getLastAsyncAutoAckDeliveryTag(); + clearLastAsyncAutoAckDeliveryTag(); + + if(tag != null) + { + for(C consumer : consumersToCheck) + { + if(consumer.isMessageListenerSet() && !consumer.shouldRequeueMessage(tag)) + { + //consumer said we should not requeue the message, do reject(false) + rejectMessage(tag, false); + + //explicitly remove records for message, we know they wont be used again + consumer.removeDeliveryCountRecordsForMessage(tag); + //no need to check other consumers now + break; + } + } + } + break; + } + } + } + protected abstract void sendRecover() throws AMQException, FailoverException; public void rejectMessage(UnprocessedMessage message, boolean requeue) @@ -2046,7 +2129,7 @@ public abstract class AMQSession extends Closeable implements Messa */ protected final int _acknowledgeMode; - /** - * Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode - */ - private int _outstanding; - - /** - * Switch to enable sending of acknowledgements when using DUPS_OK_ACKNOWLEDGE mode. Enabled when _outstannding - * number of msgs >= _prefetchHigh and disabled at < _prefetchLow - */ - private boolean _dups_ok_acknowledge_send; - - /** - * List of tags delievered, The last of which which should be acknowledged on commit in transaction mode. - */ - private ConcurrentLinkedQueue _receivedDeliveryTags = new ConcurrentLinkedQueue(); - - /** The last tag that was "multiple" acknowledged on this session (if transacted) */ - private long _lastAcked; - - /** set of tags which have previously been acked; but not part of the multiple ack (transacted mode only) */ - private final SortedSet _previouslyAcked = new TreeSet(); + private int _idMapSize = 100;//TODO: set by configuration + private int _maxDeliveryAttempts = 3; //TODO: set by configuration + private boolean _maxRedeliverEnabled = true;//TODO set based on above config - private final Object _commitLock = new Object(); + final DeliveryCountTracker _tracker = new DeliveryCountTracker(_idMapSize);//TODO /** * The thread that was used to call receive(). This is important for being able to interrupt that thread if a @@ -757,19 +735,69 @@ public abstract class BasicMessageConsumer extends Closeable implements Messa } } + private void recordMessageID(AbstractJMSMessage message) + { + if(!isMaxDeliveryCountEnforced()) + { + return; + } + + String msgId = null; + + try + { + msgId = message.getJMSMessageID(); + if(msgId != null) + { + _tracker.recordMessage(msgId, message.getDeliveryTag()); + } + } + catch (JMSException e) + { + _logger.warn("Exception while retrieving JMSMessageID from message" + + " with deliveryTag '" + message.getDeliveryTag() + "': " + e, e); + } + } + + protected boolean shouldRequeueMessage(long deliveryTag) + { + if(!isMaxDeliveryCountEnforced()) + { + return true; + } + + int count = _tracker.getDeliveryCount(deliveryTag); + + boolean reQueue = count < _maxDeliveryAttempts; + + return reQueue; + } + void preDeliver(AbstractJMSMessage msg) { switch (_acknowledgeMode) { + case Session.DUPS_OK_ACKNOWLEDGE: + //fall through + case Session.AUTO_ACKNOWLEDGE: + if(isMessageListenerSet()) + { + //keep track of JMSMessageIDs handed to the client + recordMessageID(msg); + _session.setLastAsyncAutoAckDeliveryTag(msg.getDeliveryTag()); + } + break; case Session.PRE_ACKNOWLEDGE: _session.acknowledgeMessage(msg.getDeliveryTag(), false); break; - case Session.CLIENT_ACKNOWLEDGE: // we set the session so that when the user calls acknowledge() it can call the method on session // to send out the appropriate frame msg.setAMQSession(_session); + + //keep track of JMSMessageIDs handed to the client + recordMessageID(msg); break; case Session.SESSION_TRANSACTED: if (isNoConsume()) @@ -780,18 +808,20 @@ public abstract class BasicMessageConsumer extends Closeable implements Messa { _session.addDeliveredMessage(msg.getDeliveryTag()); _session.markDirty(); + + //keep track of JMSMessageIDs handed to the client + recordMessageID(msg); } break; } } - + void postDeliver(AbstractJMSMessage msg) throws JMSException { switch (_acknowledgeMode) { - case Session.CLIENT_ACKNOWLEDGE: if (isNoConsume()) { @@ -799,101 +829,26 @@ public abstract class BasicMessageConsumer extends Closeable implements Messa } _session.markDirty(); break; - case Session.DUPS_OK_ACKNOWLEDGE: + //fall through case Session.AUTO_ACKNOWLEDGE: // we do not auto ack a message if the application code called recover() if (!_session.isInRecovery()) { _session.acknowledgeMessage(msg.getDeliveryTag(), false); - } - break; - } - } - - - /** - * Acknowledge up to last message delivered (if any). Used when commiting. - * - * @return the lastDeliveryTag to acknowledge - */ - Long getLastDelivered() - { - if (!_receivedDeliveryTags.isEmpty()) - { - Long lastDeliveryTag = _receivedDeliveryTags.poll(); - - while (!_receivedDeliveryTags.isEmpty()) - { - lastDeliveryTag = _receivedDeliveryTags.poll(); - } - - assert _receivedDeliveryTags.isEmpty(); - - return lastDeliveryTag; - } - - return null; - } - - /** - * Acknowledge up to last message delivered (if any). Used when commiting. - */ - void acknowledgeDelivered() - { - synchronized(_commitLock) - { - ArrayList tagsToAck = new ArrayList(); - - while (!_receivedDeliveryTags.isEmpty()) - { - tagsToAck.add(_receivedDeliveryTags.poll()); - } - - Collections.sort(tagsToAck); - - long prevAcked = _lastAcked; - long oldAckPoint = -1; - - while(oldAckPoint != prevAcked) - { - oldAckPoint = prevAcked; - - Iterator tagsToAckIterator = tagsToAck.iterator(); - - while(tagsToAckIterator.hasNext() && tagsToAckIterator.next() == prevAcked+1) - { - tagsToAckIterator.remove(); - prevAcked++; - } + if(isMessageListenerSet()) + { + _session.clearLastAsyncAutoAckDeliveryTag(); - Iterator previousAckIterator = _previouslyAcked.iterator(); - while(previousAckIterator.hasNext() && previousAckIterator.next() == prevAcked+1) - { - previousAckIterator.remove(); - prevAcked++; + //explicitly remove records for message, we know they wont be used again + removeDeliveryCountRecordsForMessage(msg.getDeliveryTag()); + } } - - } - if(prevAcked != _lastAcked) - { - _session.acknowledgeMessage(prevAcked, true); - _lastAcked = prevAcked; - } - - Iterator tagsToAckIterator = tagsToAck.iterator(); - - while(tagsToAckIterator.hasNext()) - { - Long tag = tagsToAckIterator.next(); - _session.acknowledgeMessage(tag, false); - _previouslyAcked.add(tag); - } + break; } } - void notifyError(Throwable cause) { // synchronized (_closed) @@ -1019,7 +974,7 @@ public abstract class BasicMessageConsumer extends Closeable implements Messa iterator.remove(); removed = true; } - } + } if (removed && (initialSize == _synchronousQueue.size())) { @@ -1072,4 +1027,28 @@ public abstract class BasicMessageConsumer extends Closeable implements Messa public void failedOverPost() {} + public boolean isMaxDeliveryCountEnforced() + { + return _maxRedeliverEnabled; + } + + public int getMaxDeliveryAttempts() + { + return _maxDeliveryAttempts; + } + public void removeDeliveryCountRecordsForMessage(long deliveryTag) + { + if(isMaxDeliveryCountEnforced()) + { + _tracker.removeRecordsForMessage(deliveryTag); + } + } + + public void removeDeliveryCountRecordsForMessages(List deliveryTags) + { + if(isMaxDeliveryCountEnforced()) + { + _tracker.removeRecordsForMessages(deliveryTags); + } + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 1b2d6876bd..ffd3957b38 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -457,4 +457,11 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer _receivedMsgIDs; + + private int _capacity; + + /** + * Creates a new DeliveryCountTracker instance. + * + * @param capacity the number of records to track. + * @throws IllegalArgumentException if specified capacity not > 0 + */ + public DeliveryCountTracker(int capacity) throws IllegalArgumentException + { + if(capacity <= 0) + { + throw new IllegalArgumentException("Specified capacity must be greater than 0."); + } + _capacity = capacity; + + /* + * HashMap Javadoc states: "If the initial capacity is greater than the maximum number + * of entries divided by the load factor, no rehash operations will ever occur." + * + * Specifying an additional 5% size at construction with a 1.0 load factor to pre-allocate + * the entries, bound the max map size, and avoid size increases + associated rehashing. + */ + int hashMapSize = (int)(_capacity * 1.05f); + + /* + * Using the access-ordered LinkedHashMap variant to leverage the LRU based entry removal + * behaviour provided when then overriding the removeEldestEntry method. + */ + _receivedMsgIDs = new LinkedHashMap(hashMapSize, 1.0f, true) + { + //Control the max size of the map using LRU based removal upon insertion. + protected boolean removeEldestEntry(Map.Entry eldest) + { + boolean remove = size() > _capacity; + + // If the supplied entry is to be removed, also remove its associated + // delivery tag + if(remove) + { + String msgId = eldest.getKey(); + + if (_logger.isDebugEnabled()) + { + _logger.debug("Removing delivery count records for message : " + msgId); + } + + synchronized (DeliveryCountTracker.this) + { + //Also remove the message information from the deliveryTag map. + if(msgId != null) + { + _jmsIDtoDeliverTag.remove(msgId); + } + } + } + + return remove; + } + }; + } + + /** + * Record sighting of a particular JMSMessageID, with the given deliveryTag. + * + * @param msgID the JMSMessageID of the message to track + * @param deliveryTag the delivery tag of the most recent encounter of the message + */ + public synchronized void recordMessage(String msgID, long deliveryTag) + { + try + { + if(msgID == null) + { + //we can't distinguish between different + //messages without a JMSMessageID, so skip + return; + } + + _jmsIDtoDeliverTag.put(msgID, deliveryTag); + + Integer count = _receivedMsgIDs.get(msgID); + + if(count != null) + { + ++count; + if (_logger.isDebugEnabled()) + { + _logger.debug("Incrementing count for JMSMessageID: '" + msgID + "', value now: " + count); + } + _receivedMsgIDs.put(msgID, count); + } + else + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Recording first sighting of JMSMessageID '" + msgID + "'"); + } + _receivedMsgIDs.put(msgID, 1); + } + } + catch(Exception e) + { + _logger.warn("Exception recording delivery count for message: " + msgID, e); + } + } + + /** + * Returns the number of times the message related to the given delivery tag has been seen + * + * @param deliveryTag delivery tag of the message to retrieve the delivery count for + * @return the delivery count for that message, or 0 if there is no count known + */ + public synchronized int getDeliveryCount(long deliveryTag) + { + String key = (String) _jmsIDtoDeliverTag.getKey(deliveryTag); + + int count = 0; + + if (key != null) + { + Integer val = _receivedMsgIDs.get(key); + if (val != null) + { + count = val; + } + } + + return count; + } + + /** + * Removes both JMSMessageID and count related records associated with the given deliveryTag if any such records exist. + * @param deliveryTag the current tag of the message for which the JMSMessageID and count records should be removed + */ + public synchronized void removeRecordsForMessage(long deliveryTag) + { + String msgId = (String) _jmsIDtoDeliverTag.removeValue(deliveryTag); + + if (msgId != null) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Removed deliveryTag mapping for ID: '" + msgId + "'"); + } + + Integer count = _receivedMsgIDs.remove(msgId); + if(count != null && _logger.isDebugEnabled()) + { + _logger.debug("Removed count mapping for ID: '" + msgId + "' : " + count); + } + } + } + + /** + * Removes both JMSMessageID and count related records associated with the given deliveryTags if any such records exist. + * @param deliveryTags the current tags of the messages for which the JMSMessageID and count records should be removed + */ + public synchronized void removeRecordsForMessages(List deliveryTags) + { + if (deliveryTags == null) + { + return; + } + + for(long tag : deliveryTags) + { + removeRecordsForMessage(tag); + } + } +} diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/DeliveryCountTrackerTest.java b/qpid/java/client/src/test/java/org/apache/qpid/client/DeliveryCountTrackerTest.java new file mode 100644 index 0000000000..2f3a186d82 --- /dev/null +++ b/qpid/java/client/src/test/java/org/apache/qpid/client/DeliveryCountTrackerTest.java @@ -0,0 +1,158 @@ +package org.apache.qpid.client; + +import java.util.ArrayList; + +import junit.framework.TestCase; + +public class DeliveryCountTrackerTest extends TestCase +{ + private DeliveryCountTracker _tracker; + private final int CAPACITY=50; + + protected void setUp() + { + _tracker = new DeliveryCountTracker(CAPACITY); + } + + /** + * Test the LRU based eviction policy of the tracker. Both the process of tracking new sightings of a given + * JMSMessageID and retrieving the existing count will involve accessing an existing record and making it + * the most recently accessed. Commit/Rollback/Recover should remove any messages that can't be seen again + * due to consumption or rejection. Any other messages must be evicted by LRU policy as and when necessary + * to make way for new entries. + * + * Test this by validating that upon tracking one more message than the capacity, the first message count + * is lost. Then access the second message count and insert a further new message occurrence. Verify the + * third message count is removed and not the second message count. + */ + public void testLRUeviction() + { + long id; + + for(id=1; id <= CAPACITY + 1; id ++) + { + _tracker.recordMessage(String.valueOf(id), id); + } + + assertEquals("Count was not as expected. First delivery tag " + + "should have been evicted already:", _tracker.getDeliveryCount(1L), 0L); + + //Retrieve second delivery tag count, ensure it is not zero. + //This will also make it the most recently accessed record. + assertEquals("Count was not as expected.", _tracker.getDeliveryCount(2L), 1L); + + //Add a new record, check that tag 2 remains and tag 3 was evicted. + _tracker.recordMessage(String.valueOf(id), id); + assertEquals("Count was not as expected. Second delivery tag " + + "should NOT have been evicted already:", _tracker.getDeliveryCount(2L), 1L); + assertEquals("Count was not as expected. Third delivery tag " + + "should have been evicted already:", _tracker.getDeliveryCount(3L), 0L); + } + + /** + * Test that once it is known a record can never be useful again it can be successfully removed + * from the records to allow room for new records without causing eviction of information that + * could still be useful. + * + * Fill the tracker with records, ensure the counts are correct and then delete them, and ensure the + * counts get reset. + */ + public void testMessageRecordRemoval() + { + long id; + + for(id=1 ; id <= CAPACITY; id ++) + { + _tracker.recordMessage(String.valueOf(id), id); + } + + assertEquals("Count was not as expected.", _tracker.getDeliveryCount(1L), 1L); + assertEquals("Count was not as expected.", _tracker.getDeliveryCount(CAPACITY/2), 1L); + assertEquals("Count was not as expected.", _tracker.getDeliveryCount(CAPACITY-1), 1L); + + //remove records for first deliveryTag, ensure the others remain as expected + _tracker.removeRecordsForMessage(1L); + assertEquals("Count was not as expected.", _tracker.getDeliveryCount(1L), 0L); + assertEquals("Count was not as expected.", _tracker.getDeliveryCount(CAPACITY/2), 1L); + assertEquals("Count was not as expected.", _tracker.getDeliveryCount(CAPACITY-1), 1L); + + //remove records for next deliveryTag, ensure the others remain as expected + _tracker.removeRecordsForMessage(CAPACITY/2); + assertEquals("Count was not as expected.", _tracker.getDeliveryCount(1L), 0L); + assertEquals("Count was not as expected.", _tracker.getDeliveryCount(CAPACITY/2), 0L); + assertEquals("Count was not as expected.", _tracker.getDeliveryCount(CAPACITY-1), 1L); + + //remove records for next deliveryTag, ensure the others remain as expected + _tracker.removeRecordsForMessage(CAPACITY-1); + assertEquals("Count was not as expected.", _tracker.getDeliveryCount(1L), 0L); + assertEquals("Count was not as expected.", _tracker.getDeliveryCount(CAPACITY/2), 0L); + assertEquals("Count was not as expected.", _tracker.getDeliveryCount(CAPACITY-1), 0L); + + //ensure records for last deliveryTag is still as expected + assertEquals("Count was not as expected.", _tracker.getDeliveryCount(CAPACITY), 1L); + } + + /** + * Test that counts are accurately incremented and associated with the new deliveryTag when + * a message with the same JMSMessageID is encountered again, also ensuring that record of + * the count is no longer returned via the old deliveryTag + */ + public void testCounting() + { + long id; + + for(id=1 ; id <= CAPACITY; id ++) + { + _tracker.recordMessage(String.valueOf(id), id); + } + + //verify all counts are currently 1 + ArrayList exclusions = new ArrayList(); + verifyCounts(1,exclusions, CAPACITY); + + //Gather some of the existing JMSMessageIDs and create new deliveryTags for them, which can + //be used to represent receiving the same message again. + String msgId1 = String.valueOf(1L); + long newTag1 = id; + String msgId2 = String.valueOf(CAPACITY/2); + long newTag2 = ++id; + String msgId3 = String.valueOf(CAPACITY); + long newTag3 = ++id; + _tracker.recordMessage(String.valueOf(msgId1), newTag1); + _tracker.recordMessage(String.valueOf(msgId2), newTag2); + _tracker.recordMessage(String.valueOf(msgId3), newTag3); + + //Now check the updated values returned are as expected. + + //entries for delivery tags with value 1,CAPACITY/2,CAPACITY should have just been removed + //because new delivery tags associated with the same JMSMessageID were just recorded. + assertEquals("Count was not as expected.", 0L, _tracker.getDeliveryCount(1)); + assertEquals("Count was not as expected.", 0L, _tracker.getDeliveryCount(CAPACITY/2)); + assertEquals("Count was not as expected.", 0L, _tracker.getDeliveryCount(CAPACITY)); + + //The count for the 'redelivered' messages with new deliveryTag should have increased. + assertEquals("Count was not as expected.", 2L, _tracker.getDeliveryCount(newTag1)); + assertEquals("Count was not as expected.", 2L, _tracker.getDeliveryCount(newTag2)); + assertEquals("Count was not as expected.", 2L, _tracker.getDeliveryCount(newTag3)); + + //all the other delivery tags should remain at 1: + exclusions.add(1L); + exclusions.add((long) CAPACITY/2); + exclusions.add((long) CAPACITY); + exclusions.add(newTag1); + exclusions.add(newTag2); + exclusions.add(newTag3); + verifyCounts(1,exclusions, CAPACITY+3); + } + + private void verifyCounts(long expectedValue, ArrayList excludeFromCheck, long numValues) + { + for(long id=1 ; id <= numValues; id ++) + { + if (!excludeFromCheck.contains(id)) + { + assertEquals("Count was not as expected for id '" + id + "'.", 1L, _tracker.getDeliveryCount(id)); + } + } + } +} -- cgit v1.2.1