summaryrefslogtreecommitdiff
path: root/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java145
1 files changed, 24 insertions, 121 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 754055ad98..3b807591b0 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -37,10 +37,7 @@ import javax.jms.MessageListener;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
-import java.util.SortedSet;
import java.util.ArrayList;
-import java.util.Collections;
-import java.util.TreeSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
@@ -118,29 +115,10 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
protected final int _acknowledgeMode;
/**
- * Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode
- */
- private int _outstanding;
-
- /**
- * Switch to enable sending of acknowledgements when using DUPS_OK_ACKNOWLEDGE mode. Enabled when _outstannding
- * number of msgs >= _prefetchHigh and disabled at < _prefetchLow
- */
- private boolean _dups_ok_acknowledge_send;
-
- /**
* List of tags delievered, The last of which which should be acknowledged on commit in transaction mode.
*/
private ConcurrentLinkedQueue<Long> _receivedDeliveryTags = new ConcurrentLinkedQueue<Long>();
- /** The last tag that was "multiple" acknowledged on this session (if transacted) */
- private long _lastAcked;
-
- /** set of tags which have previously been acked; but not part of the multiple ack (transacted mode only) */
- private final SortedSet<Long> _previouslyAcked = new TreeSet<Long>();
-
- private final Object _commitLock = new Object();
-
/**
* The thread that was used to call receive(). This is important for being able to interrupt that thread if a
* receive() is in progress.
@@ -290,17 +268,6 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
}
}
- protected void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException
- {
- if (_session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
- {
- _session.addUnacknowledgedMessage(jmsMsg.getDeliveryTag());
- }
-
- _session.setInRecovery(false);
- preDeliver(jmsMsg);
- }
-
/**
* @param immediate if true then return immediately if the connection is failing over
*
@@ -323,14 +290,14 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
}
}
- if (!_receiving.compareAndSet(false, true))
+ if (isMessageListenerSet())
{
- throw new javax.jms.IllegalStateException("Another thread is already receiving.");
+ throw new javax.jms.IllegalStateException("A listener has already been set.");
}
- if (isMessageListenerSet())
+ if (!_receiving.compareAndSet(false, true))
{
- throw new javax.jms.IllegalStateException("A listener has already been set.");
+ throw new javax.jms.IllegalStateException("Another thread is already receiving.");
}
_receivingThread = Thread.currentThread();
@@ -409,7 +376,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
final AbstractJMSMessage m = returnMessageOrThrow(o);
if (m != null)
{
- preApplicationProcessing(m);
+ preDeliver(m);
postDeliver(m);
}
return m;
@@ -482,7 +449,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
final AbstractJMSMessage m = returnMessageOrThrow(o);
if (m != null)
{
- preApplicationProcessing(m);
+ preDeliver(m);
postDeliver(m);
}
@@ -734,7 +701,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
{
if (isMessageListenerSet())
{
- preApplicationProcessing(jmsMessage);
+ preDeliver(jmsMessage);
getMessageListener().onMessage(jmsMessage);
postDeliver(jmsMessage);
}
@@ -758,49 +725,42 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
}
}
- void preDeliver(AbstractJMSMessage msg)
+ protected void preDeliver(AbstractJMSMessage msg)
{
+ _session.setInRecovery(false);
+
switch (_acknowledgeMode)
{
-
case Session.PRE_ACKNOWLEDGE:
_session.acknowledgeMessage(msg.getDeliveryTag(), false);
break;
-
+ case Session.AUTO_ACKNOWLEDGE:
+ //fall through
+ case Session.DUPS_OK_ACKNOWLEDGE:
+ _session.addUnacknowledgedMessage(msg.getDeliveryTag());
+ break;
case Session.CLIENT_ACKNOWLEDGE:
// we set the session so that when the user calls acknowledge() it can call the method on session
// to send out the appropriate frame
msg.setAMQSession(_session);
+ _session.addUnacknowledgedMessage(msg.getDeliveryTag());
+ _session.markDirty();
break;
case Session.SESSION_TRANSACTED:
- if (isNoConsume())
- {
- _session.acknowledgeMessage(msg.getDeliveryTag(), false);
- }
- else
- {
- _session.addDeliveredMessage(msg.getDeliveryTag());
- _session.markDirty();
- }
-
+ _session.addDeliveredMessage(msg.getDeliveryTag());
+ _session.markDirty();
+ break;
+ case Session.NO_ACKNOWLEDGE:
+ //do nothing.
+ //path used for NO-ACK consumers, and browsers (see constructor).
break;
}
-
}
void postDeliver(AbstractJMSMessage msg)
{
switch (_acknowledgeMode)
{
-
- case Session.CLIENT_ACKNOWLEDGE:
- if (isNoConsume())
- {
- _session.acknowledgeMessage(msg.getDeliveryTag(), false);
- }
- _session.markDirty();
- break;
-
case Session.DUPS_OK_ACKNOWLEDGE:
case Session.AUTO_ACKNOWLEDGE:
// we do not auto ack a message if the application code called recover()
@@ -838,63 +798,6 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
return null;
}
- /**
- * Acknowledge up to last message delivered (if any). Used when commiting.
- */
- void acknowledgeDelivered()
- {
- synchronized(_commitLock)
- {
- ArrayList<Long> tagsToAck = new ArrayList<Long>();
-
- while (!_receivedDeliveryTags.isEmpty())
- {
- tagsToAck.add(_receivedDeliveryTags.poll());
- }
-
- Collections.sort(tagsToAck);
-
- long prevAcked = _lastAcked;
- long oldAckPoint = -1;
-
- while(oldAckPoint != prevAcked)
- {
- oldAckPoint = prevAcked;
-
- Iterator<Long> tagsToAckIterator = tagsToAck.iterator();
-
- while(tagsToAckIterator.hasNext() && tagsToAckIterator.next() == prevAcked+1)
- {
- tagsToAckIterator.remove();
- prevAcked++;
- }
-
- Iterator<Long> previousAckIterator = _previouslyAcked.iterator();
- while(previousAckIterator.hasNext() && previousAckIterator.next() == prevAcked+1)
- {
- previousAckIterator.remove();
- prevAcked++;
- }
-
- }
- if(prevAcked != _lastAcked)
- {
- _session.acknowledgeMessage(prevAcked, true);
- _lastAcked = prevAcked;
- }
-
- Iterator<Long> tagsToAckIterator = tagsToAck.iterator();
-
- while(tagsToAckIterator.hasNext())
- {
- Long tag = tagsToAckIterator.next();
- _session.acknowledgeMessage(tag, false);
- _previouslyAcked.add(tag);
- }
- }
- }
-
-
void notifyError(Throwable cause)
{
// synchronized (_closed)
@@ -973,7 +876,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
public boolean isNoConsume()
{
- return _noConsume || _destination.isBrowseOnly() ;
+ return _noConsume;
}
public void rollback()