summaryrefslogtreecommitdiff
path: root/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java66
1 files changed, 64 insertions, 2 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index e475270ecd..783678f67c 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -47,6 +47,7 @@ import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
+import java.util.LinkedList;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -289,6 +290,61 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
}
+
+ /**
+ * The dispatcher should be stopped when calling this.
+ *
+ * @param consumerTag
+ */
+ public void removePending(String consumerTag)
+ {
+
+ synchronized (_lock)
+ {
+ boolean stopped = connectionStopped();
+
+ _dispatcher.setConnectionStopped(false);
+
+ LinkedList<UnprocessedMessage> tmpList = new LinkedList<UnprocessedMessage>();
+
+ while (_queue.size() != 0)
+ {
+ UnprocessedMessage message = null;
+ try
+ {
+ message = (UnprocessedMessage) _queue.take();
+
+ if (!message.deliverBody.consumerTag.equals(consumerTag))
+ {
+ tmpList.add(message);
+ }
+ else
+ {
+ _logger.error("Pruned pending message for consumer:" + consumerTag);
+ }
+ }
+ catch (InterruptedException e)
+ {
+ _logger.error("Interrupted whilst taking message");
+ }
+ }
+
+ if (!tmpList.isEmpty())
+ {
+ _logger.error("Tmp list is not empty");
+ }
+
+ for (UnprocessedMessage msg : tmpList)
+ {
+ _queue.add(msg);
+ }
+
+ if (stopped)
+ {
+ _dispatcher.setConnectionStopped(stopped);
+ }
+ }
+ }
}
AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
@@ -599,8 +655,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
//Ensure we only try and close an open session.
if (!_closed.getAndSet(true))
{
- // we pass null since this is not an error case
- closeProducersAndConsumers(null);
try
{
@@ -618,6 +672,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
// When control resumes at this point, a reply will have been received that
// indicates the broker has closed the channel successfully
+ // we pass null since this is not an error case
+ closeProducersAndConsumers(null);
+
}
catch (AMQException e)
{
@@ -1784,7 +1841,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
void deregisterConsumer(BasicMessageConsumer consumer)
{
+ //need to clear pending messages from session _queue that the dispatcher will handle
+ // or we will get
+ // _dispatcher.removePending(consumer.getConsumerTag());
+
_consumers.remove(consumer.getConsumerTag());
+
String subscriptionName = _reverseSubscriptionMap.remove(consumer);
if (subscriptionName != null)
{