diff options
Diffstat (limited to 'qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverOnMessageTest.java')
-rw-r--r-- | qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverOnMessageTest.java | 429 |
1 files changed, 429 insertions, 0 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverOnMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverOnMessageTest.java new file mode 100644 index 0000000000..d73d012250 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverOnMessageTest.java @@ -0,0 +1,429 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.ack; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.jms.ConnectionListener; +import org.apache.qpid.util.FileUtils; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.Session; +import javax.jms.TransactionRolledBackException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.io.File; + +/** + * The AcknowlegeAfterFailoverOnMessageTests + * + * Extends the OnMessage AcknowledgeTests to validate that after the client has + * failed over that the client can still receive and ack messages. + * + * All the AcknowledgeTest ack modes are exercised here though some are disabled + * due to know issues (e.g. DupsOk, AutoAck : QPID-143 and the clientAck + * and dirtyClientAck due to QPID-1816) + * + * This class has two main test structures, overrides of AcknowledgeOnMessageTest + * to perform the clean acking based on session ack mode and a series of dirty + * ack tests that test what happends if you receive a message then try and ack + * AFTER you have failed over. + * + * + */ +public class AcknowledgeAfterFailoverOnMessageTest extends AcknowledgeOnMessageTest implements ConnectionListener +{ + + protected CountDownLatch _failoverCompleted = new CountDownLatch(1); + private MessageListener _listener = null; + + @Override + public void setUp() throws Exception + { + super.setUp(); + NUM_MESSAGES = 10; + } + + /** + * Override default init to add connectionListener so we can verify that + * failover took place + * + * @param transacted create a transacted session for this test + * @param mode if not transacted what ack mode to use for this test + * + * @throws Exception if a problem occured during test setup. + */ + @Override + public void init(boolean transacted, int mode) throws Exception + { + super.init(transacted, mode); + ((AMQConnection) _connection).setConnectionListener(this); + // Override the listener for the dirtyAck testing. + if (_listener != null) + { + _consumer.setMessageListener(_listener); + } + } + + /** + * Prepare the broker for the next round. + * + * Called after acknowledging the messsage this method shuts the current + * broker down connnects to the new broker and send a new message for the + * client to failover to and receive. + * + * It ends by restarting the orignal broker so that the cycle can repeat. + * + * When we are able to cluster the java broker then will not need to do the + * message repopulation or QPID_WORK clearing. All that we will need to do + * is send the initial NUM_MESSAGES during startup and then bring the + * brokers down at the right time to cause the client to fail between them. + * + * @param index + * @throws Exception + */ + protected void prepBroker(int index) throws Exception + { + // Alternate killing the broker based on the message index we are at. + + if (index % 2 == 0) + { + failBroker(getFailingPort()); + // Clean up the failed broker + FileUtils.delete(new File(System.getProperty("QPID_WORK") + "/" + getFailingPort()), true); + } + else + { + failBroker(getPort()); + // Clean up the failed broker + FileUtils.delete(new File(System.getProperty("QPID_WORK") + "/" + getPort()), true); + } + + _failoverCompleted = new CountDownLatch(1); + + _logger.info("AAFOMT: prepNewBroker for message send"); + Connection connection = getConnection(); + + try + { + + //Stop the connection whilst we repopulate the broker, or the no_ack + // test will drain the msgs before we can check we put the right number + // back on again. + + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + // ensure destination is created. + session.createConsumer(_queue).close(); + + + // If this is the last message then we can skip the send. + // But we MUST ensure that we have created the queue with the + // above createConsumer(_queue).close() as the test will end by + // testing the queue depth which will fail if we don't ensure we + // declare the queue. + // index is 0 based so we need to check +1 against NUM_MESSAGES + if ((index + 1) == NUM_MESSAGES) + { + return; + } + + + sendMessage(session, _queue, 1, index + 1, 0); + + // Validate that we have the message on the queue + // In NoAck mode though the messasge may already have been sent to + // the client so we have to skip the vaildation. + if (_consumerSession.getAcknowledgeMode() != AMQSession.NO_ACKNOWLEDGE) + { + assertEquals("Wrong number of messages on queue", 1, + ((AMQSession) session).getQueueDepth((AMQDestination) _queue)); + } + + + } + catch (Exception e) + { + fail("Unable to prep new broker," + e.getMessage()); + } + finally + { + connection.close(); + } + + try + { + + //Restart the broker + if (index % 2 == 0) + { + startBroker(getFailingPort()); + } + else + { + startBroker(getPort()); + } + } + catch (Exception e) + { + fail("Unable to start failover broker," + e.getMessage()); + } + + } + + @Override + public void doAcknowlegement(Message msg) throws JMSException + { + //Acknowledge current message + super.doAcknowlegement(msg); + + try + { + prepBroker(msg.getIntProperty(INDEX)); + } + catch (Exception e) + { + // Provide details of what went wrong with the stack trace + e.printStackTrace(); + fail("Unable to prep new broker," + e); + } + } + + // Instance varilable for DirtyAcking test + int _msgCount = 0; + boolean _cleaned = false; + + class DirtyAckingHandler implements MessageListener + { + /** + * Validate first message but do nothing with it. + * + * Failover + * + * The receive the message again + * + * @param message + */ + public void onMessage(Message message) + { + // Stop processing if we have an error and had to stop running. + if (_receivedAll.getCount() == 0) + { + _logger.debug("Dumping msgs due to error(" + _causeOfFailure.get().getMessage() + "):" + message); + return; + } + + try + { + // Check we have the next message as expected + assertNotNull("Message " + _msgCount + " not correctly received.", message); + assertEquals("Incorrect message received", _msgCount, message.getIntProperty(INDEX)); + + if (_msgCount == 0 && _failoverCompleted.getCount() != 0) + { + // This is the first message we've received so lets fail the broker + + failBroker(getFailingPort()); + + repopulateBroker(); + + _logger.error("Received first msg so failing over"); + + return; + } + + _msgCount++; + + // Don't acknowlege the first message after failover so we can commit + // them together + if (_msgCount == 1) + { + _logger.error("Received first msg after failover ignoring:" + _msgCount); + + // Acknowledge the first message if we are now on the cleaned pass + if (_cleaned) + { + _receivedAll.countDown(); + } + + return; + } + + if (_consumerSession.getTransacted()) + { + try + { + _consumerSession.commit(); + if (!_cleaned) + { + fail("Session is dirty we should get an TransactionRolledBackException"); + } + } + catch (TransactionRolledBackException trbe) + { + //expected path + } + } + else + { + try + { + message.acknowledge(); + if (!_cleaned) + { + fail("Session is dirty we should get an IllegalStateException"); + } + } + catch (javax.jms.IllegalStateException ise) + { + assertEquals("Incorrect Exception thrown", "has failed over", ise.getMessage()); + // Recover the sesion and try again. + _consumerSession.recover(); + } + } + + // Acknowledge the last message if we are in a clean state + // this will then trigger test teardown. + if (_cleaned) + { + _receivedAll.countDown(); + } + + //Reset message count so we can try again. + _msgCount = 0; + _cleaned = true; + } + catch (Exception e) + { + // If something goes wrong stop and notifiy main thread. + fail(e); + } + } + } + + /** + * Test that Acking/Committing a message received before failover causes + * an exception at commit/ack time. + * + * Expected behaviour is that in: + * * tx mode commit() throws a transacted RolledBackException + * * client ack mode throws an IllegalStateException + * + * @param transacted is this session trasacted + * @param mode What ack mode should be used if not trasacted + * + * @throws Exception if something goes wrong. + */ + protected void testDirtyAcking(boolean transacted, int mode) throws Exception + { + NUM_MESSAGES = 2; + _listener = new DirtyAckingHandler(); + + super.testAcking(transacted, mode); + } + + public void testDirtyClientAck() throws Exception + { + testDirtyAcking(false, Session.CLIENT_ACKNOWLEDGE); + } + + public void testDirtyAckingTransacted() throws Exception + { + testDirtyAcking(true, Session.SESSION_TRANSACTED); + } + + private void repopulateBroker() throws Exception + { + // Repopulate this new broker so we can test what happends after failover + + //Get the connection to the first (main port) broker. + Connection connection = getConnection(); + // Use a transaction to send messages so we can be sure they arrive. + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + // ensure destination is created. + session.createConsumer(_queue).close(); + + sendMessage(session, _queue, NUM_MESSAGES); + + assertEquals("Wrong number of messages on queue", NUM_MESSAGES, + ((AMQSession) session).getQueueDepth((AMQDestination) _queue)); + + connection.close(); + } + + // AMQConnectionListener Interface.. used so we can validate that we + // actually failed over. + + public void bytesSent(long count) + { + } + + public void bytesReceived(long count) + { + } + + public boolean preFailover(boolean redirect) + { + //Allow failover + return true; + } + + public boolean preResubscribe() + { + //Allow failover + return true; + } + + public void failoverComplete() + { + _failoverCompleted.countDown(); + } + + /** + * Override so we can block until failover has completd + * + * @param port + */ + @Override + public void failBroker(int port) + { + super.failBroker(port); + + try + { + if (!_failoverCompleted.await(DEFAULT_FAILOVER_TIME, TimeUnit.MILLISECONDS)) + { + // Use an exception so that we use our local fail() that notifies the main thread of failure + throw new Exception("Failover did not occur in specified time:" + DEFAULT_FAILOVER_TIME); + } + + } + catch (Exception e) + { + // Use an exception so that we use our local fail() that notifies the main thread of failure + fail(e); + } + } + +} |