summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorRobert Greig <rgreig@apache.org>2006-12-18 09:04:09 +0000
committerRobert Greig <rgreig@apache.org>2006-12-18 09:04:09 +0000
commitbe115e3b33e6319047e8cdc3b913f14f98aa4b23 (patch)
tree20a5b1552b03e591f3c8b7ebd3e7962bcb3c46a2 /java
parent1aca5ed069697d2aa0eb743e53e787f79a299902 (diff)
downloadqpid-python-be115e3b33e6319047e8cdc3b913f14f98aa4b23.tar.gz
QPID-209 : Patch supplied by Rob Godfrey - Fix acknowledge so it only acknowledges messages that have actually been consumed
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@488159 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java39
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java48
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java10
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java41
4 files changed, 108 insertions, 30 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 70a5a1efeb..5dee3c1266 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -104,7 +104,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
/**
* Maps from consumer tag (String) to JMSMessageConsumer instance
*/
- private Map _consumers = new ConcurrentHashMap();
+ private Map<String, BasicMessageConsumer> _consumers = new ConcurrentHashMap<String, BasicMessageConsumer>();
/**
* Maps from destination to count of JMSMessageConsumers
@@ -138,7 +138,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
private volatile AtomicBoolean _stopped = new AtomicBoolean(true);
- private final AtomicLong _lastDeliveryTag = new AtomicLong();
+
/**
@@ -174,7 +174,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
if (message.deliverBody != null)
{
- final BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(message.deliverBody.consumerTag);
+ final BasicMessageConsumer consumer = _consumers.get(message.deliverBody.consumerTag);
if (consumer == null)
{
@@ -184,7 +184,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
else
{
-
+
consumer.notifyMessage(message, _channelId);
}
@@ -467,10 +467,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
// Acknowledge up to message last delivered (if any) for each consumer.
//need to send ack for messages delivered to consumers so far
- for (Iterator i = _consumers.values().iterator(); i.hasNext();)
+ for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();)
{
//Sends acknowledgement to server
- ((BasicMessageConsumer) i.next()).acknowledgeLastDelivered();
+ i.next().acknowledgeLastDelivered();
}
// Commits outstanding messages sent and outstanding acknowledgements.
@@ -652,12 +652,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
// we need to clone the list of consumers since the close() method updates the _consumers collection
// which would result in a concurrent modification exception
- final ArrayList clonedConsumers = new ArrayList(_consumers.values());
+ final ArrayList<BasicMessageConsumer> clonedConsumers = new ArrayList(_consumers.values());
- final Iterator it = clonedConsumers.iterator();
+ final Iterator<BasicMessageConsumer> it = clonedConsumers.iterator();
while (it.hasNext())
{
- final BasicMessageConsumer con = (BasicMessageConsumer) it.next();
+ final BasicMessageConsumer con = it.next();
if (error != null)
{
con.notifyError(error);
@@ -678,12 +678,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
// we need to clone the list of consumers since the close() method updates the _consumers collection
// which would result in a concurrent modification exception
- final ArrayList clonedConsumers = new ArrayList(_consumers.values());
+ final ArrayList<BasicMessageConsumer> clonedConsumers = new ArrayList<BasicMessageConsumer>(_consumers.values());
- final Iterator it = clonedConsumers.iterator();
+ final Iterator<BasicMessageConsumer> it = clonedConsumers.iterator();
while (it.hasNext())
{
- final BasicMessageConsumer con = (BasicMessageConsumer) it.next();
+ final BasicMessageConsumer con = it.next();
con.markClosed();
}
// at this point the _consumers map will be empty
@@ -702,27 +702,22 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
_connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId, false));
}
-
public void acknowledge() throws JMSException
{
- if (getAMQConnection().isClosed())
+ if(isClosed())
{
- throw new javax.jms.IllegalStateException("Connection is already closed");
+ throw new IllegalStateException("Session is already closed");
}
- if (isClosed())
+ for(BasicMessageConsumer consumer : _consumers.values())
{
- throw new javax.jms.IllegalStateException("Session is already closed");
+ consumer.acknowledge();
}
- acknowledgeMessage(_lastDeliveryTag.get(), true);
- }
- void setLastDeliveredMessage(AbstractJMSMessage message)
- {
- _lastDeliveryTag.set(message.getDeliveryTag());
}
+
public MessageListener getMessageListener() throws JMSException
{
checkNotClosed();
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 9f9038fddd..a94ce09b8e 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -38,12 +38,17 @@ import javax.jms.Message;
import javax.jms.MessageListener;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.Iterator;
public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
- private static final Logger _logger = Logger.getLogger(BasicMessageConsumer.class);
+ private static final Logger _logger = Logger.getLogger(BasicMessageConsumer.class);
/**
* The connection being used by this consumer
@@ -80,7 +85,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
* Used in the blocking receive methods to receive a message from
* the Session thread. Argument true indicates we want strict FIFO semantics
*/
- private final SynchronousQueue _synchronousQueue = new SynchronousQueue(true);
+ private final ArrayBlockingQueue _synchronousQueue;
private MessageFactoryRegistry _messageFactory;
@@ -132,6 +137,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
*/
private boolean _dups_ok_acknowledge_send;
+ private ConcurrentLinkedQueue<Long> _unacknowledgedDeliveryTags = new ConcurrentLinkedQueue<Long>();
+
protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector,
boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session,
AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable,
@@ -150,6 +157,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
_prefetchLow = prefetchLow;
_exclusive = exclusive;
_acknowledgeMode = acknowledgeMode;
+ _synchronousQueue = new ArrayBlockingQueue(prefetchHigh, true);
}
public AMQDestination getDestination()
@@ -217,7 +225,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
AbstractJMSMessage jmsMsg = (AbstractJMSMessage)_synchronousQueue.poll();
if (jmsMsg != null)
{
- _session.setLastDeliveredMessage(jmsMsg);
+ preApplicationProcessing(jmsMsg);
messageListener.onMessage(jmsMsg);
postDeliver(jmsMsg);
}
@@ -225,6 +233,14 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
}
+ private void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException
+ {
+ if(_session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
+ {
+ _unacknowledgedDeliveryTags.add(jmsMsg.getDeliveryTag());
+ }
+ }
+
private void acquireReceiving() throws JMSException
{
if (!_receiving.compareAndSet(false, true))
@@ -297,7 +313,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
final AbstractJMSMessage m = returnMessageOrThrow(o);
if (m != null)
{
- _session.setLastDeliveredMessage(m);
+ preApplicationProcessing(m);
postDeliver(m);
}
@@ -326,7 +342,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
final AbstractJMSMessage m = returnMessageOrThrow(o);
if (m != null)
{
- _session.setLastDeliveredMessage(m);
+ preApplicationProcessing(m);
postDeliver(m);
}
@@ -385,6 +401,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
deregisterConsumer();
+ _unacknowledgedDeliveryTags.clear();
}
}
}
@@ -421,6 +438,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
messageFrame.bodies);
_logger.debug("Message is of type: " + jmsMessage.getClass().getName());
+ jmsMessage.setConsumer(this);
preDeliver(jmsMessage);
@@ -428,7 +446,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
//we do not need a lock around the test above, and the dispatch below as it is invalid
//for an application to alter an installed listener while the session is started
- _session.setLastDeliveredMessage(jmsMessage);
+ preApplicationProcessing(jmsMessage);
getMessageListener().onMessage(jmsMessage);
postDeliver(jmsMessage);
}
@@ -554,4 +572,22 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
throw new javax.jms.IllegalStateException("Invalid Session");
}
}
+
+ public void acknowledge() throws JMSException
+ {
+ if(!isClosed())
+ {
+
+ Iterator<Long> tags = _unacknowledgedDeliveryTags.iterator();
+ while(tags.hasNext())
+ {
+ _session.acknowledgeMessage(tags.next(), false);
+ tags.remove();
+ }
+ }
+ else
+ {
+ throw new IllegalStateException("Consumer is closed");
+ }
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
index aaf7fa41bf..23d6c0151e 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
@@ -26,7 +26,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.url.BindingURL;
import org.apache.qpid.url.AMQBindingURL;
import org.apache.qpid.url.URLSyntaxException;
-import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.*;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.FieldTable;
@@ -48,7 +48,8 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
private boolean _readableProperties = false;
private boolean _readableMessage = false;
private Destination _destination;
-
+ private BasicMessageConsumer _consumer;
+
protected AbstractJMSMessage(ByteBuffer data)
{
super(new BasicContentHeaderProperties());
@@ -533,4 +534,9 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
_readableMessage = true;
}
}
+
+ public void setConsumer(BasicMessageConsumer basicMessageConsumer)
+ {
+ _consumer = basicMessageConsumer;
+ }
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
index 9ff02c3b71..817fcfb9e8 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
@@ -169,7 +169,48 @@ public class RecoverTest extends TestCase
con.close();
}
+ public void testAcknowledgePerConsumer() throws Exception
+ {
+ Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+
+ Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ Queue queue = new AMQQueue("Q1", "Q1", false, true);
+ Queue queue2 = new AMQQueue("Q2", "Q2", false, true);
+ MessageConsumer consumer = consumerSession.createConsumer(queue);
+ MessageConsumer consumer2 = consumerSession.createConsumer(queue2);
+ //force synch to ensure the consumer has resulted in a bound queue
+ ((AMQSession) consumerSession).declareExchangeSynch("amq.direct", "direct");
+
+ Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
+ Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageProducer producer = producerSession.createProducer(queue);
+ MessageProducer producer2 = producerSession.createProducer(queue2);
+
+ producer.send(producerSession.createTextMessage("msg1"));
+ producer2.send(producerSession.createTextMessage("msg2"));
+
+ con2.close();
+
+ _logger.info("Starting connection");
+ con.start();
+
+ TextMessage tm2 = (TextMessage) consumer2.receive();
+ assertNotNull(tm2);
+ assertEquals("msg2",tm2.getText());
+
+ tm2.acknowledge();
+
+ consumerSession.recover();
+
+ TextMessage tm1 = (TextMessage) consumer.receive(2000);
+ assertNotNull(tm1);
+ assertEquals("msg1",tm1.getText());
+
+ con.close();
+
+ }
+
public static junit.framework.Test suite()
{
return new junit.framework.TestSuite(RecoverTest.class);