summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java27
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java5
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/MessageConsumerCloseTest.java57
3 files changed, 68 insertions, 21 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index ce624cb91b..55d3ccb6e7 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -3212,28 +3212,15 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public void rejectPending(C consumer)
{
- synchronized (_lock)
- {
- boolean stopped = connectionStopped();
+ // Reject messages on pre-receive queue
+ consumer.rollbackPendingMessages();
- if (!stopped)
- {
- setConnectionStopped(true);
- }
+ // Reject messages on pre-dispatch queue
+ rejectMessagesForConsumerTag(consumer.getConsumerTag(), true, false);
- // Reject messages on pre-receive queue
- consumer.rollbackPendingMessages();
+ // closeConsumer
+ consumer.markClosed();
- // Reject messages on pre-dispatch queue
- rejectMessagesForConsumerTag(consumer.getConsumerTag(), true, false);
- //Let the dispatcher deal with this when it gets to them.
-
- // closeConsumer
- consumer.markClosed();
-
- setConnectionStopped(stopped);
-
- }
}
public void rollback()
@@ -3425,7 +3412,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
final C consumer = _consumers.get(message.getConsumerTag());
- if ((consumer == null) || consumer.isClosed())
+ if ((consumer == null) || consumer.isClosed() || consumer.isClosing())
{
if (_dispatcherLogger.isInfoEnabled())
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 0d717a3216..0f8b5717d6 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -593,7 +593,10 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
// no point otherwise as the connection will be gone
if (!_session.isClosed() || _session.isClosing())
{
- sendCancel();
+ synchronized(_session.getMessageDeliveryLock())
+ {
+ sendCancel();
+ }
cleanupQueue();
}
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/MessageConsumerCloseTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/MessageConsumerCloseTest.java
new file mode 100644
index 0000000000..907290933a
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/MessageConsumerCloseTest.java
@@ -0,0 +1,57 @@
+package org.apache.qpid.test.unit.close;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+public class MessageConsumerCloseTest extends QpidBrokerTestCase
+{
+ Exception _exception;
+
+ public void testConsumerCloseAndSessionRollback() throws Exception
+ {
+ Connection connection = getConnection();
+ final CountDownLatch receiveLatch = new CountDownLatch(1);
+ final Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ Destination destination = getTestQueue();
+ MessageConsumer consumer = session.createConsumer(destination);
+ sendMessage(session, destination, 2);
+ connection.start();
+ consumer.setMessageListener(new MessageListener()
+ {
+ @Override
+ public void onMessage(Message message)
+ {
+ try
+ {
+ receiveLatch.countDown();
+ session.rollback();
+ }
+ catch (JMSException e)
+ {
+ _exception = e;
+ }
+ }
+ });
+ boolean messageReceived = receiveLatch.await(1l, TimeUnit.SECONDS);
+ consumer.close();
+
+ assertNull("Exception occured on rollback:" + _exception, _exception);
+ assertTrue("Message is not received", messageReceived);
+
+ consumer = session.createConsumer(destination);
+ Message message1 = consumer.receive(1000l);
+ assertNotNull("message1 is not received", message1);
+ Message message2 = consumer.receive(1000l);
+ assertNotNull("message2 is not received", message2);
+ }
+}