diff options
author | Robert Greig <rgreig@apache.org> | 2006-12-18 09:04:09 +0000 |
---|---|---|
committer | Robert Greig <rgreig@apache.org> | 2006-12-18 09:04:09 +0000 |
commit | be115e3b33e6319047e8cdc3b913f14f98aa4b23 (patch) | |
tree | 20a5b1552b03e591f3c8b7ebd3e7962bcb3c46a2 /java | |
parent | 1aca5ed069697d2aa0eb743e53e787f79a299902 (diff) | |
download | qpid-python-be115e3b33e6319047e8cdc3b913f14f98aa4b23.tar.gz |
QPID-209 : Patch supplied by Rob Godfrey - Fix acknowledge so it only acknowledges messages that have actually been consumed
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@488159 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
4 files changed, 108 insertions, 30 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 70a5a1efeb..5dee3c1266 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -104,7 +104,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi /** * Maps from consumer tag (String) to JMSMessageConsumer instance */ - private Map _consumers = new ConcurrentHashMap(); + private Map<String, BasicMessageConsumer> _consumers = new ConcurrentHashMap<String, BasicMessageConsumer>(); /** * Maps from destination to count of JMSMessageConsumers @@ -138,7 +138,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ private volatile AtomicBoolean _stopped = new AtomicBoolean(true); - private final AtomicLong _lastDeliveryTag = new AtomicLong(); + /** @@ -174,7 +174,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { if (message.deliverBody != null) { - final BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(message.deliverBody.consumerTag); + final BasicMessageConsumer consumer = _consumers.get(message.deliverBody.consumerTag); if (consumer == null) { @@ -184,7 +184,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } else { - + consumer.notifyMessage(message, _channelId); } @@ -467,10 +467,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { // Acknowledge up to message last delivered (if any) for each consumer. //need to send ack for messages delivered to consumers so far - for (Iterator i = _consumers.values().iterator(); i.hasNext();) + for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();) { //Sends acknowledgement to server - ((BasicMessageConsumer) i.next()).acknowledgeLastDelivered(); + i.next().acknowledgeLastDelivered(); } // Commits outstanding messages sent and outstanding acknowledgements. @@ -652,12 +652,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } // we need to clone the list of consumers since the close() method updates the _consumers collection // which would result in a concurrent modification exception - final ArrayList clonedConsumers = new ArrayList(_consumers.values()); + final ArrayList<BasicMessageConsumer> clonedConsumers = new ArrayList(_consumers.values()); - final Iterator it = clonedConsumers.iterator(); + final Iterator<BasicMessageConsumer> it = clonedConsumers.iterator(); while (it.hasNext()) { - final BasicMessageConsumer con = (BasicMessageConsumer) it.next(); + final BasicMessageConsumer con = it.next(); if (error != null) { con.notifyError(error); @@ -678,12 +678,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } // we need to clone the list of consumers since the close() method updates the _consumers collection // which would result in a concurrent modification exception - final ArrayList clonedConsumers = new ArrayList(_consumers.values()); + final ArrayList<BasicMessageConsumer> clonedConsumers = new ArrayList<BasicMessageConsumer>(_consumers.values()); - final Iterator it = clonedConsumers.iterator(); + final Iterator<BasicMessageConsumer> it = clonedConsumers.iterator(); while (it.hasNext()) { - final BasicMessageConsumer con = (BasicMessageConsumer) it.next(); + final BasicMessageConsumer con = it.next(); con.markClosed(); } // at this point the _consumers map will be empty @@ -702,27 +702,22 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId, false)); } - public void acknowledge() throws JMSException { - if (getAMQConnection().isClosed()) + if(isClosed()) { - throw new javax.jms.IllegalStateException("Connection is already closed"); + throw new IllegalStateException("Session is already closed"); } - if (isClosed()) + for(BasicMessageConsumer consumer : _consumers.values()) { - throw new javax.jms.IllegalStateException("Session is already closed"); + consumer.acknowledge(); } - acknowledgeMessage(_lastDeliveryTag.get(), true); - } - void setLastDeliveredMessage(AbstractJMSMessage message) - { - _lastDeliveryTag.set(message.getDeliveryTag()); } + public MessageListener getMessageListener() throws JMSException { checkNotClosed(); diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 9f9038fddd..a94ce09b8e 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -38,12 +38,17 @@ import javax.jms.Message; import javax.jms.MessageListener; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.Iterator; public class BasicMessageConsumer extends Closeable implements MessageConsumer { - private static final Logger _logger = Logger.getLogger(BasicMessageConsumer.class); + private static final Logger _logger = Logger.getLogger(BasicMessageConsumer.class); /** * The connection being used by this consumer @@ -80,7 +85,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer * Used in the blocking receive methods to receive a message from * the Session thread. Argument true indicates we want strict FIFO semantics */ - private final SynchronousQueue _synchronousQueue = new SynchronousQueue(true); + private final ArrayBlockingQueue _synchronousQueue; private MessageFactoryRegistry _messageFactory; @@ -132,6 +137,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer */ private boolean _dups_ok_acknowledge_send; + private ConcurrentLinkedQueue<Long> _unacknowledgedDeliveryTags = new ConcurrentLinkedQueue<Long>(); + protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session, AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable, @@ -150,6 +157,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer _prefetchLow = prefetchLow; _exclusive = exclusive; _acknowledgeMode = acknowledgeMode; + _synchronousQueue = new ArrayBlockingQueue(prefetchHigh, true); } public AMQDestination getDestination() @@ -217,7 +225,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer AbstractJMSMessage jmsMsg = (AbstractJMSMessage)_synchronousQueue.poll(); if (jmsMsg != null) { - _session.setLastDeliveredMessage(jmsMsg); + preApplicationProcessing(jmsMsg); messageListener.onMessage(jmsMsg); postDeliver(jmsMsg); } @@ -225,6 +233,14 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } } + private void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException + { + if(_session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) + { + _unacknowledgedDeliveryTags.add(jmsMsg.getDeliveryTag()); + } + } + private void acquireReceiving() throws JMSException { if (!_receiving.compareAndSet(false, true)) @@ -297,7 +313,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer final AbstractJMSMessage m = returnMessageOrThrow(o); if (m != null) { - _session.setLastDeliveredMessage(m); + preApplicationProcessing(m); postDeliver(m); } @@ -326,7 +342,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer final AbstractJMSMessage m = returnMessageOrThrow(o); if (m != null) { - _session.setLastDeliveredMessage(m); + preApplicationProcessing(m); postDeliver(m); } @@ -385,6 +401,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } deregisterConsumer(); + _unacknowledgedDeliveryTags.clear(); } } } @@ -421,6 +438,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer messageFrame.bodies); _logger.debug("Message is of type: " + jmsMessage.getClass().getName()); + jmsMessage.setConsumer(this); preDeliver(jmsMessage); @@ -428,7 +446,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { //we do not need a lock around the test above, and the dispatch below as it is invalid //for an application to alter an installed listener while the session is started - _session.setLastDeliveredMessage(jmsMessage); + preApplicationProcessing(jmsMessage); getMessageListener().onMessage(jmsMessage); postDeliver(jmsMessage); } @@ -554,4 +572,22 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer throw new javax.jms.IllegalStateException("Invalid Session"); } } + + public void acknowledge() throws JMSException + { + if(!isClosed()) + { + + Iterator<Long> tags = _unacknowledgedDeliveryTags.iterator(); + while(tags.hasNext()) + { + _session.acknowledgeMessage(tags.next(), false); + tags.remove(); + } + } + else + { + throw new IllegalStateException("Consumer is closed"); + } + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java index aaf7fa41bf..23d6c0151e 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java @@ -26,7 +26,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.url.BindingURL; import org.apache.qpid.url.AMQBindingURL; import org.apache.qpid.url.URLSyntaxException; -import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.*; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.FieldTable; @@ -48,7 +48,8 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach private boolean _readableProperties = false; private boolean _readableMessage = false; private Destination _destination; - + private BasicMessageConsumer _consumer; + protected AbstractJMSMessage(ByteBuffer data) { super(new BasicContentHeaderProperties()); @@ -533,4 +534,9 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach _readableMessage = true; } } + + public void setConsumer(BasicMessageConsumer basicMessageConsumer) + { + _consumer = basicMessageConsumer; + } } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java index 9ff02c3b71..817fcfb9e8 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java @@ -169,7 +169,48 @@ public class RecoverTest extends TestCase con.close(); } + public void testAcknowledgePerConsumer() throws Exception + { + Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); + + Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Queue queue = new AMQQueue("Q1", "Q1", false, true); + Queue queue2 = new AMQQueue("Q2", "Q2", false, true); + MessageConsumer consumer = consumerSession.createConsumer(queue); + MessageConsumer consumer2 = consumerSession.createConsumer(queue2); + //force synch to ensure the consumer has resulted in a bound queue + ((AMQSession) consumerSession).declareExchangeSynch("amq.direct", "direct"); + + Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test"); + Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); + MessageProducer producer = producerSession.createProducer(queue); + MessageProducer producer2 = producerSession.createProducer(queue2); + + producer.send(producerSession.createTextMessage("msg1")); + producer2.send(producerSession.createTextMessage("msg2")); + + con2.close(); + + _logger.info("Starting connection"); + con.start(); + + TextMessage tm2 = (TextMessage) consumer2.receive(); + assertNotNull(tm2); + assertEquals("msg2",tm2.getText()); + + tm2.acknowledge(); + + consumerSession.recover(); + + TextMessage tm1 = (TextMessage) consumer.receive(2000); + assertNotNull(tm1); + assertEquals("msg1",tm1.getText()); + + con.close(); + + } + public static junit.framework.Test suite() { return new junit.framework.TestSuite(RecoverTest.class); |