diff options
Diffstat (limited to 'java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java')
-rw-r--r-- | java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java | 97 |
1 files changed, 84 insertions, 13 deletions
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java index dc0ade76c4..d12ab01bdc 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -19,19 +19,15 @@ */ package org.apache.qpid.test.unit.ack; +import junit.framework.TestCase; +import org.apache.log4j.Logger; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.transport.TransportConnection; -import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException; -import org.apache.qpid.testutil.VMBrokerSetup; -import org.apache.log4j.Logger; -import org.apache.log4j.xml.DOMConfigurator; import javax.jms.*; -import junit.framework.TestCase; - public class RecoverTest extends TestCase { private static final Logger _logger = Logger.getLogger(RecoverTest.class); @@ -46,11 +42,9 @@ public class RecoverTest extends TestCase { super.tearDown(); TransportConnection.killAllVMBrokers(); - //Thread.sleep(2000); } - public void testRecoverResendsMsgs() throws Exception { Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); @@ -133,7 +127,7 @@ public class RecoverTest extends TestCase _logger.info("Starting connection"); con.start(); TextMessage tm = (TextMessage) consumer.receive(); - TextMessage tm2 = (TextMessage) consumer.receive(); + consumer.receive(); tm.acknowledge(); _logger.info("Received 2 messages, acknowledge() first message, should acknowledge both"); @@ -150,7 +144,7 @@ public class RecoverTest extends TestCase _logger.info("Received redelivery of two messages. calling acknolwedgeThis() first of those message"); - ((org.apache.qpid.jms.Message)tm3).acknowledgeThis(); + ((org.apache.qpid.jms.Message) tm3).acknowledgeThis(); _logger.info("Calling recover"); // all acked so no messages to be delivered @@ -158,7 +152,7 @@ public class RecoverTest extends TestCase tm4 = (TextMessage) consumer.receive(3000); assertEquals("msg4", tm4.getText()); - ((org.apache.qpid.jms.Message)tm4).acknowledgeThis(); + ((org.apache.qpid.jms.Message) tm4).acknowledgeThis(); _logger.info("Calling recover"); // all acked so no messages to be delivered @@ -172,6 +166,83 @@ public class RecoverTest extends TestCase con.close(); } + public void testAcknowledgePerConsumer() throws Exception + { + Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); + + Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Queue queue = new AMQQueue("Q1", "Q1", false, true); + Queue queue2 = new AMQQueue("Q2", "Q2", false, true); + MessageConsumer consumer = consumerSession.createConsumer(queue); + MessageConsumer consumer2 = consumerSession.createConsumer(queue2); + + Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test"); + Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); + MessageProducer producer = producerSession.createProducer(queue); + MessageProducer producer2 = producerSession.createProducer(queue2); + + producer.send(producerSession.createTextMessage("msg1")); + producer2.send(producerSession.createTextMessage("msg2")); + + con2.close(); + + _logger.info("Starting connection"); + con.start(); + + TextMessage tm2 = (TextMessage) consumer2.receive(); + assertNotNull(tm2); + assertEquals("msg2", tm2.getText()); + + tm2.acknowledge(); + + consumerSession.recover(); + + TextMessage tm1 = (TextMessage) consumer.receive(2000); + assertNotNull(tm1); + assertEquals("msg1", tm1.getText()); + + con.close(); + + } + + public void testRecoverInAutoAckListener() throws Exception + { + Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); + + final Session consumerSession = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = new AMQQueue("Q1", "Q1", false, true); + MessageProducer producer = consumerSession.createProducer(queue); + producer.send(consumerSession.createTextMessage("hello")); + MessageConsumer consumer = consumerSession.createConsumer(queue); + consumer.setMessageListener(new MessageListener() + { + private int count = 0; + + public void onMessage(Message message) + { + try + { + if (count++ == 0) + { + assertFalse(message.getJMSRedelivered()); + consumerSession.recover(); + } + else if (count++ == 1) + { + assertTrue(message.getJMSRedelivered()); + } + else + { + fail("Message delivered too many times!"); + } + } + catch (JMSException e) + { + _logger.error("Error recovering session: " + e, e); + } + } + }); + } public static junit.framework.Test suite() { |