summaryrefslogtreecommitdiff
path: root/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java207
1 files changed, 93 insertions, 114 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index bea43cc232..8d43bef4ec 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -33,15 +33,11 @@ import org.slf4j.LoggerFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
+
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
-import java.util.SortedSet;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.TreeSet;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -116,29 +112,11 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
*/
protected final int _acknowledgeMode;
- /**
- * Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode
- */
- private int _outstanding;
-
- /**
- * Switch to enable sending of acknowledgements when using DUPS_OK_ACKNOWLEDGE mode. Enabled when _outstannding
- * number of msgs >= _prefetchHigh and disabled at < _prefetchLow
- */
- private boolean _dups_ok_acknowledge_send;
-
- /**
- * List of tags delievered, The last of which which should be acknowledged on commit in transaction mode.
- */
- private ConcurrentLinkedQueue<Long> _receivedDeliveryTags = new ConcurrentLinkedQueue<Long>();
-
- /** The last tag that was "multiple" acknowledged on this session (if transacted) */
- private long _lastAcked;
-
- /** set of tags which have previously been acked; but not part of the multiple ack (transacted mode only) */
- private final SortedSet<Long> _previouslyAcked = new TreeSet<Long>();
+ private int _idMapSize = 100;//TODO: set by configuration
+ private int _maxDeliveryAttempts = 3; //TODO: set by configuration
+ private boolean _maxRedeliverEnabled = true;//TODO set based on above config
- private final Object _commitLock = new Object();
+ final DeliveryCountTracker _tracker = new DeliveryCountTracker(_idMapSize);//TODO
/**
* The thread that was used to call receive(). This is important for being able to interrupt that thread if a
@@ -757,19 +735,69 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
}
}
+ private void recordMessageID(AbstractJMSMessage message)
+ {
+ if(!isMaxDeliveryCountEnforced())
+ {
+ return;
+ }
+
+ String msgId = null;
+
+ try
+ {
+ msgId = message.getJMSMessageID();
+ if(msgId != null)
+ {
+ _tracker.recordMessage(msgId, message.getDeliveryTag());
+ }
+ }
+ catch (JMSException e)
+ {
+ _logger.warn("Exception while retrieving JMSMessageID from message" +
+ " with deliveryTag '" + message.getDeliveryTag() + "': " + e, e);
+ }
+ }
+
+ protected boolean shouldRequeueMessage(long deliveryTag)
+ {
+ if(!isMaxDeliveryCountEnforced())
+ {
+ return true;
+ }
+
+ int count = _tracker.getDeliveryCount(deliveryTag);
+
+ boolean reQueue = count < _maxDeliveryAttempts;
+
+ return reQueue;
+ }
+
void preDeliver(AbstractJMSMessage msg)
{
switch (_acknowledgeMode)
{
+ case Session.DUPS_OK_ACKNOWLEDGE:
+ //fall through
+ case Session.AUTO_ACKNOWLEDGE:
+ if(isMessageListenerSet())
+ {
+ //keep track of JMSMessageIDs handed to the client
+ recordMessageID(msg);
+ _session.setLastAsyncAutoAckDeliveryTag(msg.getDeliveryTag());
+ }
+ break;
case Session.PRE_ACKNOWLEDGE:
_session.acknowledgeMessage(msg.getDeliveryTag(), false);
break;
-
case Session.CLIENT_ACKNOWLEDGE:
// we set the session so that when the user calls acknowledge() it can call the method on session
// to send out the appropriate frame
msg.setAMQSession(_session);
+
+ //keep track of JMSMessageIDs handed to the client
+ recordMessageID(msg);
break;
case Session.SESSION_TRANSACTED:
if (isNoConsume())
@@ -780,18 +808,20 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
{
_session.addDeliveredMessage(msg.getDeliveryTag());
_session.markDirty();
+
+ //keep track of JMSMessageIDs handed to the client
+ recordMessageID(msg);
}
break;
}
}
-
+
void postDeliver(AbstractJMSMessage msg) throws JMSException
{
switch (_acknowledgeMode)
{
-
case Session.CLIENT_ACKNOWLEDGE:
if (isNoConsume())
{
@@ -799,101 +829,26 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
}
_session.markDirty();
break;
-
case Session.DUPS_OK_ACKNOWLEDGE:
+ //fall through
case Session.AUTO_ACKNOWLEDGE:
// we do not auto ack a message if the application code called recover()
if (!_session.isInRecovery())
{
_session.acknowledgeMessage(msg.getDeliveryTag(), false);
- }
- break;
- }
- }
-
-
- /**
- * Acknowledge up to last message delivered (if any). Used when commiting.
- *
- * @return the lastDeliveryTag to acknowledge
- */
- Long getLastDelivered()
- {
- if (!_receivedDeliveryTags.isEmpty())
- {
- Long lastDeliveryTag = _receivedDeliveryTags.poll();
-
- while (!_receivedDeliveryTags.isEmpty())
- {
- lastDeliveryTag = _receivedDeliveryTags.poll();
- }
-
- assert _receivedDeliveryTags.isEmpty();
-
- return lastDeliveryTag;
- }
-
- return null;
- }
-
- /**
- * Acknowledge up to last message delivered (if any). Used when commiting.
- */
- void acknowledgeDelivered()
- {
- synchronized(_commitLock)
- {
- ArrayList<Long> tagsToAck = new ArrayList<Long>();
-
- while (!_receivedDeliveryTags.isEmpty())
- {
- tagsToAck.add(_receivedDeliveryTags.poll());
- }
-
- Collections.sort(tagsToAck);
-
- long prevAcked = _lastAcked;
- long oldAckPoint = -1;
-
- while(oldAckPoint != prevAcked)
- {
- oldAckPoint = prevAcked;
-
- Iterator<Long> tagsToAckIterator = tagsToAck.iterator();
-
- while(tagsToAckIterator.hasNext() && tagsToAckIterator.next() == prevAcked+1)
- {
- tagsToAckIterator.remove();
- prevAcked++;
- }
+ if(isMessageListenerSet())
+ {
+ _session.clearLastAsyncAutoAckDeliveryTag();
- Iterator<Long> previousAckIterator = _previouslyAcked.iterator();
- while(previousAckIterator.hasNext() && previousAckIterator.next() == prevAcked+1)
- {
- previousAckIterator.remove();
- prevAcked++;
+ //explicitly remove records for message, we know they wont be used again
+ removeDeliveryCountRecordsForMessage(msg.getDeliveryTag());
+ }
}
-
- }
- if(prevAcked != _lastAcked)
- {
- _session.acknowledgeMessage(prevAcked, true);
- _lastAcked = prevAcked;
- }
-
- Iterator<Long> tagsToAckIterator = tagsToAck.iterator();
-
- while(tagsToAckIterator.hasNext())
- {
- Long tag = tagsToAckIterator.next();
- _session.acknowledgeMessage(tag, false);
- _previouslyAcked.add(tag);
- }
+ break;
}
}
-
void notifyError(Throwable cause)
{
// synchronized (_closed)
@@ -1019,7 +974,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
iterator.remove();
removed = true;
}
- }
+ }
if (removed && (initialSize == _synchronousQueue.size()))
{
@@ -1072,4 +1027,28 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
public void failedOverPost() {}
+ public boolean isMaxDeliveryCountEnforced()
+ {
+ return _maxRedeliverEnabled;
+ }
+
+ public int getMaxDeliveryAttempts()
+ {
+ return _maxDeliveryAttempts;
+ }
+ public void removeDeliveryCountRecordsForMessage(long deliveryTag)
+ {
+ if(isMaxDeliveryCountEnforced())
+ {
+ _tracker.removeRecordsForMessage(deliveryTag);
+ }
+ }
+
+ public void removeDeliveryCountRecordsForMessages(List<Long> deliveryTags)
+ {
+ if(isMaxDeliveryCountEnforced())
+ {
+ _tracker.removeRecordsForMessages(deliveryTags);
+ }
+ }
}