From 5df8a91b3e88b8f75507cf83ded634bf0d165607 Mon Sep 17 00:00:00 2001 From: Robert Greig Date: Mon, 18 Dec 2006 12:12:57 +0000 Subject: QPID-209 Fix to clear unacked message list on recover() git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@488249 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/client/AMQSession.java | 17 ++++++++-------- .../apache/qpid/client/BasicMessageConsumer.java | 23 +++++++++++++--------- 2 files changed, 23 insertions(+), 17 deletions(-) diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 5dee3c1266..183865ac21 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -23,23 +23,22 @@ package org.apache.qpid.client; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.AMQUndeliveredException; -import org.apache.qpid.server.handler.ExchangeBoundHandler; -import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.client.failover.FailoverSupport; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.JMSStreamMessage; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage; -import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.protocol.AMQMethodEvent; +import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.util.FlowControllingBlockingQueue; +import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.*; import org.apache.qpid.jms.Session; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.handler.ExchangeBoundHandler; import org.apache.qpid.url.AMQBindingURL; import org.apache.qpid.url.URLSyntaxException; - import javax.jms.*; import javax.jms.IllegalStateException; import java.io.Serializable; @@ -50,7 +49,6 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; public class AMQSession extends Closeable implements Session, QueueSession, TopicSession { @@ -184,7 +182,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } else { - + consumer.notifyMessage(message, _channelId); } @@ -698,7 +696,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { checkNotClosed(); checkNotTransacted(); // throws IllegalStateException if a transacted session - + for (BasicMessageConsumer consumer : _consumers.values()) + { + consumer.clearUnackedMessages(); + } _connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId, false)); } @@ -1474,7 +1475,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi String subscriptionName = _reverseSubscriptionMap.remove(consumer); if(subscriptionName != null) { - _subscriptions.remove(subscriptionName); + _subscriptions.remove(subscriptionName); } Destination dest = consumer.getDestination(); diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index a94ce09b8e..3a5de6f10c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -36,19 +36,16 @@ import org.apache.qpid.jms.Session; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.Iterator; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import java.util.ArrayList; -import java.util.LinkedList; -import java.util.Iterator; public class BasicMessageConsumer extends Closeable implements MessageConsumer { - private static final Logger _logger = Logger.getLogger(BasicMessageConsumer.class); + private static final Logger _logger = Logger.getLogger(BasicMessageConsumer.class); /** * The connection being used by this consumer @@ -296,7 +293,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer public Message receive(long l) throws JMSException { checkPreConditions(); - + acquireReceiving(); try @@ -316,7 +313,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer preApplicationProcessing(m); postDeliver(m); } - + return m; } catch (InterruptedException e) @@ -590,4 +587,12 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer throw new IllegalStateException("Consumer is closed"); } } + + /** + * Called on recovery to reset the list of delivery tags + */ + public void clearUnackedMessages() + { + _unacknowledgedDeliveryTags.clear(); + } } -- cgit v1.2.1