diff options
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java')
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java | 207 |
1 files changed, 93 insertions, 114 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index bea43cc232..8d43bef4ec 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -33,15 +33,11 @@ import org.slf4j.LoggerFactory; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; + import java.util.Arrays; import java.util.Iterator; import java.util.List; -import java.util.SortedSet; -import java.util.ArrayList; -import java.util.Collections; -import java.util.TreeSet; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -116,29 +112,11 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa */ protected final int _acknowledgeMode; - /** - * Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode - */ - private int _outstanding; - - /** - * Switch to enable sending of acknowledgements when using DUPS_OK_ACKNOWLEDGE mode. Enabled when _outstannding - * number of msgs >= _prefetchHigh and disabled at < _prefetchLow - */ - private boolean _dups_ok_acknowledge_send; - - /** - * List of tags delievered, The last of which which should be acknowledged on commit in transaction mode. - */ - private ConcurrentLinkedQueue<Long> _receivedDeliveryTags = new ConcurrentLinkedQueue<Long>(); - - /** The last tag that was "multiple" acknowledged on this session (if transacted) */ - private long _lastAcked; - - /** set of tags which have previously been acked; but not part of the multiple ack (transacted mode only) */ - private final SortedSet<Long> _previouslyAcked = new TreeSet<Long>(); + private int _idMapSize = 100;//TODO: set by configuration + private int _maxDeliveryAttempts = 3; //TODO: set by configuration + private boolean _maxRedeliverEnabled = true;//TODO set based on above config - private final Object _commitLock = new Object(); + final DeliveryCountTracker _tracker = new DeliveryCountTracker(_idMapSize);//TODO /** * The thread that was used to call receive(). This is important for being able to interrupt that thread if a @@ -757,19 +735,69 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa } } + private void recordMessageID(AbstractJMSMessage message) + { + if(!isMaxDeliveryCountEnforced()) + { + return; + } + + String msgId = null; + + try + { + msgId = message.getJMSMessageID(); + if(msgId != null) + { + _tracker.recordMessage(msgId, message.getDeliveryTag()); + } + } + catch (JMSException e) + { + _logger.warn("Exception while retrieving JMSMessageID from message" + + " with deliveryTag '" + message.getDeliveryTag() + "': " + e, e); + } + } + + protected boolean shouldRequeueMessage(long deliveryTag) + { + if(!isMaxDeliveryCountEnforced()) + { + return true; + } + + int count = _tracker.getDeliveryCount(deliveryTag); + + boolean reQueue = count < _maxDeliveryAttempts; + + return reQueue; + } + void preDeliver(AbstractJMSMessage msg) { switch (_acknowledgeMode) { + case Session.DUPS_OK_ACKNOWLEDGE: + //fall through + case Session.AUTO_ACKNOWLEDGE: + if(isMessageListenerSet()) + { + //keep track of JMSMessageIDs handed to the client + recordMessageID(msg); + _session.setLastAsyncAutoAckDeliveryTag(msg.getDeliveryTag()); + } + break; case Session.PRE_ACKNOWLEDGE: _session.acknowledgeMessage(msg.getDeliveryTag(), false); break; - case Session.CLIENT_ACKNOWLEDGE: // we set the session so that when the user calls acknowledge() it can call the method on session // to send out the appropriate frame msg.setAMQSession(_session); + + //keep track of JMSMessageIDs handed to the client + recordMessageID(msg); break; case Session.SESSION_TRANSACTED: if (isNoConsume()) @@ -780,18 +808,20 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa { _session.addDeliveredMessage(msg.getDeliveryTag()); _session.markDirty(); + + //keep track of JMSMessageIDs handed to the client + recordMessageID(msg); } break; } } - + void postDeliver(AbstractJMSMessage msg) throws JMSException { switch (_acknowledgeMode) { - case Session.CLIENT_ACKNOWLEDGE: if (isNoConsume()) { @@ -799,101 +829,26 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa } _session.markDirty(); break; - case Session.DUPS_OK_ACKNOWLEDGE: + //fall through case Session.AUTO_ACKNOWLEDGE: // we do not auto ack a message if the application code called recover() if (!_session.isInRecovery()) { _session.acknowledgeMessage(msg.getDeliveryTag(), false); - } - break; - } - } - - - /** - * Acknowledge up to last message delivered (if any). Used when commiting. - * - * @return the lastDeliveryTag to acknowledge - */ - Long getLastDelivered() - { - if (!_receivedDeliveryTags.isEmpty()) - { - Long lastDeliveryTag = _receivedDeliveryTags.poll(); - - while (!_receivedDeliveryTags.isEmpty()) - { - lastDeliveryTag = _receivedDeliveryTags.poll(); - } - - assert _receivedDeliveryTags.isEmpty(); - - return lastDeliveryTag; - } - - return null; - } - - /** - * Acknowledge up to last message delivered (if any). Used when commiting. - */ - void acknowledgeDelivered() - { - synchronized(_commitLock) - { - ArrayList<Long> tagsToAck = new ArrayList<Long>(); - - while (!_receivedDeliveryTags.isEmpty()) - { - tagsToAck.add(_receivedDeliveryTags.poll()); - } - - Collections.sort(tagsToAck); - - long prevAcked = _lastAcked; - long oldAckPoint = -1; - - while(oldAckPoint != prevAcked) - { - oldAckPoint = prevAcked; - - Iterator<Long> tagsToAckIterator = tagsToAck.iterator(); - - while(tagsToAckIterator.hasNext() && tagsToAckIterator.next() == prevAcked+1) - { - tagsToAckIterator.remove(); - prevAcked++; - } + if(isMessageListenerSet()) + { + _session.clearLastAsyncAutoAckDeliveryTag(); - Iterator<Long> previousAckIterator = _previouslyAcked.iterator(); - while(previousAckIterator.hasNext() && previousAckIterator.next() == prevAcked+1) - { - previousAckIterator.remove(); - prevAcked++; + //explicitly remove records for message, we know they wont be used again + removeDeliveryCountRecordsForMessage(msg.getDeliveryTag()); + } } - - } - if(prevAcked != _lastAcked) - { - _session.acknowledgeMessage(prevAcked, true); - _lastAcked = prevAcked; - } - - Iterator<Long> tagsToAckIterator = tagsToAck.iterator(); - - while(tagsToAckIterator.hasNext()) - { - Long tag = tagsToAckIterator.next(); - _session.acknowledgeMessage(tag, false); - _previouslyAcked.add(tag); - } + break; } } - void notifyError(Throwable cause) { // synchronized (_closed) @@ -1019,7 +974,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa iterator.remove(); removed = true; } - } + } if (removed && (initialSize == _synchronousQueue.size())) { @@ -1072,4 +1027,28 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa public void failedOverPost() {} + public boolean isMaxDeliveryCountEnforced() + { + return _maxRedeliverEnabled; + } + + public int getMaxDeliveryAttempts() + { + return _maxDeliveryAttempts; + } + public void removeDeliveryCountRecordsForMessage(long deliveryTag) + { + if(isMaxDeliveryCountEnforced()) + { + _tracker.removeRecordsForMessage(deliveryTag); + } + } + + public void removeDeliveryCountRecordsForMessages(List<Long> deliveryTags) + { + if(isMaxDeliveryCountEnforced()) + { + _tracker.removeRecordsForMessages(deliveryTags); + } + } } |