summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorRobert Greig <rgreig@apache.org>2007-01-16 10:51:04 +0000
committerRobert Greig <rgreig@apache.org>2007-01-16 10:51:04 +0000
commitf4d558a399c72f571b3575e5ac12ca20d71b0cb7 (patch)
tree80ac33e6ba4763e3bc3d5d7fc50942f2c3a0470e /java
parentc6b6c099bb65db1d0b350ca1736c3f9b5c6ab64b (diff)
downloadqpid-python-f4d558a399c72f571b3575e5ac12ca20d71b0cb7.tar.gz
QPID-299 Messages not being correctly requeued when transacted session closed
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@496658 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java1
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java86
2 files changed, 86 insertions, 1 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 0497d4bb8f..10f039779c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -311,6 +311,7 @@ public class AMQChannel
_txnContext.rollback();
unsubscribeAllConsumers(session);
requeue();
+ _txnContext.commit();
}
private void unsubscribeAllConsumers(AMQProtocolSession session) throws AMQException
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
index 90a11307b8..bbad5862a0 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
@@ -23,7 +23,12 @@ package org.apache.qpid.test.unit.transacted;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.testutil.VMBrokerSetup;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.mina.util.SessionLog;
+import org.apache.log4j.Logger;
import javax.jms.*;
@@ -47,10 +52,12 @@ public class TransactedTest extends TestCase
private Session testSession;
private MessageConsumer testConsumer1;
private MessageConsumer testConsumer2;
+ private static final Logger _logger = Logger.getLogger(TransactedTest.class);
protected void setUp() throws Exception
{
super.setUp();
+ TransportConnection.createVMBroker(1);
queue1 = new AMQQueue("Q1", false);
queue2 = new AMQQueue("Q2", false);
@@ -86,6 +93,7 @@ public class TransactedTest extends TestCase
con.close();
testCon.close();
prepCon.close();
+ TransportConnection.killAllVMBrokers();
super.tearDown();
}
@@ -132,6 +140,82 @@ public class TransactedTest extends TestCase
assertTrue(null == testConsumer2.receive(1000));
}
+ public void testResendsMsgsAfterSessionClose() throws Exception
+ {
+ Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+
+ Session consumerSession = con.createSession(true, Session.CLIENT_ACKNOWLEDGE);
+ AMQQueue queue3 = new AMQQueue("Q3", false);
+ MessageConsumer consumer = consumerSession.createConsumer(queue3);
+ //force synch to ensure the consumer has resulted in a bound queue
+ ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
+
+ Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
+ Session producerSession = con2.createSession(true, Session.CLIENT_ACKNOWLEDGE);
+ MessageProducer producer = producerSession.createProducer(queue3);
+
+ _logger.info("Sending four messages");
+ producer.send(producerSession.createTextMessage("msg1"));
+ producer.send(producerSession.createTextMessage("msg2"));
+ producer.send(producerSession.createTextMessage("msg3"));
+ producer.send(producerSession.createTextMessage("msg4"));
+
+ producerSession.commit();
+
+
+ _logger.info("Starting connection");
+ con.start();
+ TextMessage tm = (TextMessage) consumer.receive();
+
+ tm.acknowledge();
+ consumerSession.commit();
+
+ _logger.info("Received and acknowledged first message");
+ tm = (TextMessage) consumer.receive(1000);
+ assertNotNull(tm);
+ tm = (TextMessage) consumer.receive(1000);
+ assertNotNull(tm);
+ tm = (TextMessage) consumer.receive(1000);
+ assertNotNull(tm);
+ _logger.info("Received all four messages. Closing connection with three outstanding messages");
+
+ consumerSession.close();
+
+ consumerSession = con.createSession(true, Session.CLIENT_ACKNOWLEDGE);
+
+ consumer = consumerSession.createConsumer(queue3);
+
+ // no ack for last three messages so when I call recover I expect to get three messages back
+
+ tm = (TextMessage) consumer.receive(3000);
+ assertNotNull(tm);
+ assertEquals("msg2", tm.getText());
+
+ tm = (TextMessage) consumer.receive(3000);
+ assertNotNull(tm);
+ assertEquals("msg3", tm.getText());
+
+ tm = (TextMessage) consumer.receive(3000);
+ assertNotNull(tm);
+ assertEquals("msg4", tm.getText());
+
+ _logger.info("Received redelivery of three messages. Acknowledging last message");
+ tm.acknowledge();
+ consumerSession.commit();
+ _logger.info("Calling acknowledge with no outstanding messages");
+ // all acked so no messages to be delivered
+
+
+ tm = (TextMessage) consumer.receiveNoWait();
+ assertNull(tm);
+ _logger.info("No messages redelivered as is expected");
+
+ con.close();
+ con2.close();
+
+ }
+
+
private void expect(String text, Message msg) throws JMSException
{
assertTrue(msg instanceof TextMessage);
@@ -140,6 +224,6 @@ public class TransactedTest extends TestCase
public static junit.framework.Test suite()
{
- return new VMBrokerSetup(new junit.framework.TestSuite(TransactedTest.class));
+ return new junit.framework.TestSuite(TransactedTest.class);
}
}