summaryrefslogtreecommitdiff
path: root/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java')
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java97
1 files changed, 84 insertions, 13 deletions
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
index dc0ade76c4..d12ab01bdc 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -19,19 +19,15 @@
*/
package org.apache.qpid.test.unit.ack;
+import junit.framework.TestCase;
+import org.apache.log4j.Logger;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.transport.TransportConnection;
-import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
-import org.apache.qpid.testutil.VMBrokerSetup;
-import org.apache.log4j.Logger;
-import org.apache.log4j.xml.DOMConfigurator;
import javax.jms.*;
-import junit.framework.TestCase;
-
public class RecoverTest extends TestCase
{
private static final Logger _logger = Logger.getLogger(RecoverTest.class);
@@ -46,11 +42,9 @@ public class RecoverTest extends TestCase
{
super.tearDown();
TransportConnection.killAllVMBrokers();
- //Thread.sleep(2000);
}
-
public void testRecoverResendsMsgs() throws Exception
{
Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
@@ -133,7 +127,7 @@ public class RecoverTest extends TestCase
_logger.info("Starting connection");
con.start();
TextMessage tm = (TextMessage) consumer.receive();
- TextMessage tm2 = (TextMessage) consumer.receive();
+ consumer.receive();
tm.acknowledge();
_logger.info("Received 2 messages, acknowledge() first message, should acknowledge both");
@@ -150,7 +144,7 @@ public class RecoverTest extends TestCase
_logger.info("Received redelivery of two messages. calling acknolwedgeThis() first of those message");
- ((org.apache.qpid.jms.Message)tm3).acknowledgeThis();
+ ((org.apache.qpid.jms.Message) tm3).acknowledgeThis();
_logger.info("Calling recover");
// all acked so no messages to be delivered
@@ -158,7 +152,7 @@ public class RecoverTest extends TestCase
tm4 = (TextMessage) consumer.receive(3000);
assertEquals("msg4", tm4.getText());
- ((org.apache.qpid.jms.Message)tm4).acknowledgeThis();
+ ((org.apache.qpid.jms.Message) tm4).acknowledgeThis();
_logger.info("Calling recover");
// all acked so no messages to be delivered
@@ -172,6 +166,83 @@ public class RecoverTest extends TestCase
con.close();
}
+ public void testAcknowledgePerConsumer() throws Exception
+ {
+ Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+
+ Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ Queue queue = new AMQQueue("Q1", "Q1", false, true);
+ Queue queue2 = new AMQQueue("Q2", "Q2", false, true);
+ MessageConsumer consumer = consumerSession.createConsumer(queue);
+ MessageConsumer consumer2 = consumerSession.createConsumer(queue2);
+
+ Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
+ Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageProducer producer = producerSession.createProducer(queue);
+ MessageProducer producer2 = producerSession.createProducer(queue2);
+
+ producer.send(producerSession.createTextMessage("msg1"));
+ producer2.send(producerSession.createTextMessage("msg2"));
+
+ con2.close();
+
+ _logger.info("Starting connection");
+ con.start();
+
+ TextMessage tm2 = (TextMessage) consumer2.receive();
+ assertNotNull(tm2);
+ assertEquals("msg2", tm2.getText());
+
+ tm2.acknowledge();
+
+ consumerSession.recover();
+
+ TextMessage tm1 = (TextMessage) consumer.receive(2000);
+ assertNotNull(tm1);
+ assertEquals("msg1", tm1.getText());
+
+ con.close();
+
+ }
+
+ public void testRecoverInAutoAckListener() throws Exception
+ {
+ Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+
+ final Session consumerSession = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = new AMQQueue("Q1", "Q1", false, true);
+ MessageProducer producer = consumerSession.createProducer(queue);
+ producer.send(consumerSession.createTextMessage("hello"));
+ MessageConsumer consumer = consumerSession.createConsumer(queue);
+ consumer.setMessageListener(new MessageListener()
+ {
+ private int count = 0;
+
+ public void onMessage(Message message)
+ {
+ try
+ {
+ if (count++ == 0)
+ {
+ assertFalse(message.getJMSRedelivered());
+ consumerSession.recover();
+ }
+ else if (count++ == 1)
+ {
+ assertTrue(message.getJMSRedelivered());
+ }
+ else
+ {
+ fail("Message delivered too many times!");
+ }
+ }
+ catch (JMSException e)
+ {
+ _logger.error("Error recovering session: " + e, e);
+ }
+ }
+ });
+ }
public static junit.framework.Test suite()
{