diff options
Diffstat (limited to 'trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java')
-rw-r--r-- | trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java | 331 |
1 files changed, 0 insertions, 331 deletions
diff --git a/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java b/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java deleted file mode 100644 index 7434fcbb30..0000000000 --- a/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java +++ /dev/null @@ -1,331 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.test.unit.ack; - -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.jms.Session; -import org.apache.qpid.test.utils.QpidTestCase; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.TextMessage; - -import java.util.concurrent.atomic.AtomicInteger; - -public class RecoverTest extends QpidTestCase -{ - private static final Logger _logger = LoggerFactory.getLogger(RecoverTest.class); - - private Exception _error; - private AtomicInteger count; - - protected void setUp() throws Exception - { - super.setUp(); - _error = null; - count = new AtomicInteger(); - } - - protected void tearDown() throws Exception - { - super.tearDown(); - count = null; - } - - public void testRecoverResendsMsgs() throws Exception - { - AMQConnection con = (AMQConnection) getConnection("guest", "guest"); - - Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); - Queue queue = - new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("someQ"), - new AMQShortString("someQ"), false, true); - MessageConsumer consumer = consumerSession.createConsumer(queue); - // force synch to ensure the consumer has resulted in a bound queue - // ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS); - // This is the default now - - AMQConnection con2 = (AMQConnection) getConnection("guest", "guest"); - Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); - MessageProducer producer = producerSession.createProducer(queue); - - _logger.info("Sending four messages"); - producer.send(producerSession.createTextMessage("msg1")); - producer.send(producerSession.createTextMessage("msg2")); - producer.send(producerSession.createTextMessage("msg3")); - producer.send(producerSession.createTextMessage("msg4")); - - con2.close(); - - _logger.info("Starting connection"); - con.start(); - TextMessage tm = (TextMessage) consumer.receive(); - tm.acknowledge(); - _logger.info("Received and acknowledged first message"); - consumer.receive(); - consumer.receive(); - consumer.receive(); - _logger.info("Received all four messages. Calling recover with three outstanding messages"); - // no ack for last three messages so when I call recover I expect to get three messages back - consumerSession.recover(); - tm = (TextMessage) consumer.receive(3000); - assertEquals("msg2", tm.getText()); - - tm = (TextMessage) consumer.receive(3000); - assertEquals("msg3", tm.getText()); - - tm = (TextMessage) consumer.receive(3000); - assertEquals("msg4", tm.getText()); - - _logger.info("Received redelivery of three messages. Acknowledging last message"); - tm.acknowledge(); - - _logger.info("Calling acknowledge with no outstanding messages"); - // all acked so no messages to be delivered - consumerSession.recover(); - - tm = (TextMessage) consumer.receiveNoWait(); - assertNull(tm); - _logger.info("No messages redelivered as is expected"); - - con.close(); - } - - public void testRecoverResendsMsgsAckOnEarlier() throws Exception - { - AMQConnection con = (AMQConnection) getConnection("guest", "guest"); - - Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); - Queue queue = - new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("someQ"), - new AMQShortString("someQ"), false, true); - MessageConsumer consumer = consumerSession.createConsumer(queue); - // force synch to ensure the consumer has resulted in a bound queue - // ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS); - // This is the default now - - AMQConnection con2 = (AMQConnection) getConnection("guest", "guest"); - Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); - MessageProducer producer = producerSession.createProducer(queue); - - _logger.info("Sending four messages"); - producer.send(producerSession.createTextMessage("msg1")); - producer.send(producerSession.createTextMessage("msg2")); - producer.send(producerSession.createTextMessage("msg3")); - producer.send(producerSession.createTextMessage("msg4")); - - con2.close(); - - _logger.info("Starting connection"); - con.start(); - TextMessage tm = (TextMessage) consumer.receive(); - consumer.receive(); - tm.acknowledge(); - _logger.info("Received 2 messages, acknowledge() first message, should acknowledge both"); - - consumer.receive(); - consumer.receive(); - _logger.info("Received all four messages. Calling recover with two outstanding messages"); - // no ack for last three messages so when I call recover I expect to get three messages back - consumerSession.recover(); - TextMessage tm3 = (TextMessage) consumer.receive(3000); - assertEquals("msg3", tm3.getText()); - - TextMessage tm4 = (TextMessage) consumer.receive(3000); - assertEquals("msg4", tm4.getText()); - - _logger.info("Received redelivery of two messages. calling acknolwedgeThis() first of those message"); - ((org.apache.qpid.jms.Message) tm3).acknowledgeThis(); - - _logger.info("Calling recover"); - // all acked so no messages to be delivered - consumerSession.recover(); - - tm4 = (TextMessage) consumer.receive(3000); - assertEquals("msg4", tm4.getText()); - ((org.apache.qpid.jms.Message) tm4).acknowledgeThis(); - - _logger.info("Calling recover"); - // all acked so no messages to be delivered - consumerSession.recover(); - - tm = (TextMessage) consumer.receiveNoWait(); - assertNull(tm); - _logger.info("No messages redelivered as is expected"); - - con.close(); - } - - public void testAcknowledgePerConsumer() throws Exception - { - AMQConnection con = (AMQConnection) getConnection("guest", "guest"); - - Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); - Queue queue = - new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q1"), new AMQShortString("Q1"), - false, true); - Queue queue2 = - new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q2"), new AMQShortString("Q2"), - false, true); - MessageConsumer consumer = consumerSession.createConsumer(queue); - MessageConsumer consumer2 = consumerSession.createConsumer(queue2); - - AMQConnection con2 = (AMQConnection) getConnection("guest", "guest"); - Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); - MessageProducer producer = producerSession.createProducer(queue); - MessageProducer producer2 = producerSession.createProducer(queue2); - - producer.send(producerSession.createTextMessage("msg1")); - producer2.send(producerSession.createTextMessage("msg2")); - - con2.close(); - - _logger.info("Starting connection"); - con.start(); - - TextMessage tm2 = (TextMessage) consumer2.receive(2000); - assertNotNull(tm2); - assertEquals("msg2", tm2.getText()); - - tm2.acknowledge(); - - consumerSession.recover(); - - TextMessage tm1 = (TextMessage) consumer.receive(2000); - assertNotNull(tm1); - assertEquals("msg1", tm1.getText()); - - con.close(); - - } - - public void testRecoverInAutoAckListener() throws Exception - { - AMQConnection con = (AMQConnection) getConnection("guest", "guest"); - - final Session consumerSession = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue queue = - new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q3"), new AMQShortString("Q3"), - false, true); - MessageConsumer consumer = consumerSession.createConsumer(queue); - MessageProducer producer = consumerSession.createProducer(queue); - producer.send(consumerSession.createTextMessage("hello")); - - final Object lock = new Object(); - - consumer.setMessageListener(new MessageListener() - { - - public void onMessage(Message message) - { - try - { - count.incrementAndGet(); - if (count.get() == 1) - { - if (message.getJMSRedelivered()) - { - setError( - new Exception("Message marked as redilvered on what should be first delivery attempt")); - } - - consumerSession.recover(); - } - else if (count.get() == 2) - { - if (!message.getJMSRedelivered()) - { - setError( - new Exception( - "Message not marked as redilvered on what should be second delivery attempt")); - } - } - else - { - System.err.println(message); - fail("Message delivered too many times!: " + count); - } - } - catch (JMSException e) - { - _logger.error("Error recovering session: " + e, e); - setError(e); - } - - synchronized (lock) - { - lock.notify(); - } - } - }); - - con.start(); - - long waitTime = 30000L; - long waitUntilTime = System.currentTimeMillis() + waitTime; - - synchronized (lock) - { - while ((count.get() <= 1) && (waitTime > 0)) - { - lock.wait(waitTime); - if (count.get() <= 1) - { - waitTime = waitUntilTime - System.currentTimeMillis(); - } - } - } - - Thread.sleep(1000); - - if (count.get() != 2) - { - System.err.println("Count != 2 : " + count); - } - - assertTrue(count.get() == 2); - - con.close(); - - if (_error != null) - { - throw _error; - } - } - - private void setError(Exception e) - { - _error = e; - } - - public static junit.framework.Test suite() - { - return new junit.framework.TestSuite(RecoverTest.class); - } -} |