summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Greig <rgreig@apache.org>2006-12-18 12:12:57 +0000
committerRobert Greig <rgreig@apache.org>2006-12-18 12:12:57 +0000
commit5df8a91b3e88b8f75507cf83ded634bf0d165607 (patch)
tree998661974805b406fa60ab881ee3e04fff315a4c
parent6cac9687ac715ff4da0ea4add6ab1b9abeb2e7eb (diff)
downloadqpid-python-5df8a91b3e88b8f75507cf83ded634bf0d165607.tar.gz
QPID-209 Fix to clear unacked message list on recover()
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@488249 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java17
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java23
2 files changed, 23 insertions, 17 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 5dee3c1266..183865ac21 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -23,23 +23,22 @@ package org.apache.qpid.client;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQUndeliveredException;
-import org.apache.qpid.server.handler.ExchangeBoundHandler;
-import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.client.failover.FailoverSupport;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.JMSStreamMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.protocol.AMQMethodEvent;
+import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.util.FlowControllingBlockingQueue;
+import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.*;
import org.apache.qpid.jms.Session;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.handler.ExchangeBoundHandler;
import org.apache.qpid.url.AMQBindingURL;
import org.apache.qpid.url.URLSyntaxException;
-
import javax.jms.*;
import javax.jms.IllegalStateException;
import java.io.Serializable;
@@ -50,7 +49,6 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
public class AMQSession extends Closeable implements Session, QueueSession, TopicSession
{
@@ -184,7 +182,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
else
{
-
+
consumer.notifyMessage(message, _channelId);
}
@@ -698,7 +696,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
checkNotClosed();
checkNotTransacted(); // throws IllegalStateException if a transacted session
-
+ for (BasicMessageConsumer consumer : _consumers.values())
+ {
+ consumer.clearUnackedMessages();
+ }
_connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId, false));
}
@@ -1474,7 +1475,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
String subscriptionName = _reverseSubscriptionMap.remove(consumer);
if(subscriptionName != null)
{
- _subscriptions.remove(subscriptionName);
+ _subscriptions.remove(subscriptionName);
}
Destination dest = consumer.getDestination();
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index a94ce09b8e..3a5de6f10c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -36,19 +36,16 @@ import org.apache.qpid.jms.Session;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.Iterator;
import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.Iterator;
public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
- private static final Logger _logger = Logger.getLogger(BasicMessageConsumer.class);
+ private static final Logger _logger = Logger.getLogger(BasicMessageConsumer.class);
/**
* The connection being used by this consumer
@@ -296,7 +293,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
public Message receive(long l) throws JMSException
{
checkPreConditions();
-
+
acquireReceiving();
try
@@ -316,7 +313,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
preApplicationProcessing(m);
postDeliver(m);
}
-
+
return m;
}
catch (InterruptedException e)
@@ -590,4 +587,12 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
throw new IllegalStateException("Consumer is closed");
}
}
+
+ /**
+ * Called on recovery to reset the list of delivery tags
+ */
+ public void clearUnackedMessages()
+ {
+ _unacknowledgedDeliveryTags.clear();
+ }
}