From b60aee6f1f3e167fd42d343cd33235b07b5ffe6b Mon Sep 17 00:00:00 2001 From: Robert Gemmell Date: Thu, 6 Oct 2011 16:47:12 +0000 Subject: QPID-3524, QPID-3525, QPID-3526, QPID-3527: add new failover behaviour tests for variety of acknowledgement modes Applied patch from Oleksandr Rudyy and myself. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1179703 13f79535-47bb-0310-9956-ffa450edef68 --- .../client/failover/FailoverBehaviourTest.java | 924 +++++++++++++++++++++ 1 file changed, 924 insertions(+) create mode 100644 java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java diff --git a/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java b/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java new file mode 100644 index 0000000000..aa62106c46 --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java @@ -0,0 +1,924 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.client.failover; + +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.TransactionRolledBackException; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.jms.ConnectionListener; +import org.apache.qpid.test.utils.FailoverBaseCase; + +/** + * Test suite to test all possible failover corner cases + */ +public class FailoverBehaviourTest extends FailoverBaseCase implements ConnectionListener, ExceptionListener +{ + private static final String TEST_MESSAGE_FORMAT = "test message {0}"; + + /** Indicates whether tests are run against clustered broker */ + private static boolean CLUSTERED = Boolean.getBoolean("profile.clustered"); + + /** Default number of messages to send before failover */ + private static final int DEFAULT_NUMBER_OF_MESSAGES = 10; + + /** Actual number of messages to send before failover */ + protected int _messageNumber = Integer.getInteger("profile.failoverMsgCount", DEFAULT_NUMBER_OF_MESSAGES); + + /** Test connection */ + protected Connection _connection; + + /** + * Failover completion latch is used to wait till connectivity to broker is + * restored + */ + private CountDownLatch _failoverComplete; + + /** + * Consumer session + */ + private Session _consumerSession; + + /** + * Test destination + */ + private Destination _destination; + + /** + * Consumer + */ + private MessageConsumer _consumer; + + /** + * Producer session + */ + private Session _producerSession; + + /** + * Producer + */ + private MessageProducer _producer; + + /** + * Holds exception sent into {@link ExceptionListener} on failover + */ + private JMSException _exceptionListenerException; + + @Override + protected void setUp() throws Exception + { + super.setUp(); + + _connection = getConnection(); + _connection.setExceptionListener(this); + ((AMQConnection) _connection).setConnectionListener(this); + _failoverComplete = new CountDownLatch(1); + } + + /** + * Test whether MessageProducer can successfully publish messages after + * failover and rollback transaction + */ + public void testMessageProducingAndRollbackAfterFailover() throws Exception + { + init(Session.SESSION_TRANSACTED, true); + produceMessages(); + causeFailure(); + + assertFailoverException(); + // producer should be able to send messages after failover + _producer.send(_producerSession.createTextMessage("test message " + _messageNumber)); + + // rollback after failover + _producerSession.rollback(); + + // tests whether sending and committing is working after failover + produceMessages(); + _producerSession.commit(); + + // tests whether receiving and committing is working after failover + consumeMessages(); + _consumerSession.commit(); + } + + /** + * Test whether {@link TransactionRolledBackException} is thrown on commit + * of dirty transacted session after failover. + *

+ * Verifies whether second after failover commit is successful. + */ + public void testTransactionRolledBackExceptionThrownOnCommitAfterFailoverOnProducingMessages() throws Exception + { + init(Session.SESSION_TRANSACTED, true); + produceMessages(); + causeFailure(); + + assertFailoverException(); + + // producer should be able to send messages after failover + _producer.send(_producerSession.createTextMessage("test message " + _messageNumber)); + + try + { + _producerSession.commit(); + fail("TransactionRolledBackException is expected on commit after failover with dirty session!"); + } + catch (JMSException t) + { + assertTrue("Expected TransactionRolledBackException but thrown " + t, + t instanceof TransactionRolledBackException); + } + + // simulate process of user replaying the transaction + produceMessages("replayed test message {0}", _messageNumber, false); + + // no exception should be thrown + _producerSession.commit(); + + // only messages sent after rollback should be received + consumeMessages("replayed test message {0}", _messageNumber); + + // no exception should be thrown + _consumerSession.commit(); + } + + /** + * Tests JMSException is not thrown on commit with a clean session after + * failover + */ + public void testNoJMSExceptionThrownOnCommitAfterFailoverWithCleanProducerSession() throws Exception + { + init(Session.SESSION_TRANSACTED, true); + + causeFailure(); + + assertFailoverException(); + + // should not throw an exception for a clean session + _producerSession.commit(); + + // tests whether sending and committing is working after failover + produceMessages(); + _producerSession.commit(); + + // tests whether receiving and committing is working after failover + consumeMessages(); + _consumerSession.commit(); + } + + /** + * Tests {@link TransactionRolledBackException} is thrown on commit of dirty + * transacted session after failover. + *

+ * Verifies whether second after failover commit is successful. + */ + public void testTransactionRolledBackExceptionThrownOnCommitAfterFailoverOnMessageReceiving() throws Exception + { + init(Session.SESSION_TRANSACTED, true); + produceMessages(); + _producerSession.commit(); + + // receive messages but do not commit + consumeMessages(); + + causeFailure(); + + assertFailoverException(); + + try + { + // should throw TransactionRolledBackException + _consumerSession.commit(); + fail("TransactionRolledBackException is expected on commit after failover"); + } + catch (Exception t) + { + assertTrue("Expected TransactionRolledBackException but thrown " + t, + t instanceof TransactionRolledBackException); + } + + resendMessagesIfNecessary(); + + // consume messages successfully + consumeMessages(); + _consumerSession.commit(); + } + + /** + * Tests JMSException is not thrown on commit with a clean session after failover + */ + public void testNoJMSExceptionThrownOnCommitAfterFailoverWithCleanConsumerSession() throws Exception + { + init(Session.SESSION_TRANSACTED, true); + produceMessages(); + _producerSession.commit(); + + consumeMessages(); + _consumerSession.commit(); + + causeFailure(); + + assertFailoverException(); + + // should not throw an exception with a clean consumer session + _consumerSession.commit(); + } + + /** + * Test that TransactionRolledBackException is thrown on commit of + * dirty session in asynchronous consumer after failover. + */ + public void testTransactionRolledBackExceptionThrownOnCommitAfterFailoverOnReceivingMessagesAsynchronously() + throws Exception + { + init(Session.SESSION_TRANSACTED, false); + FailoverTestMessageListener ml = new FailoverTestMessageListener(); + _consumer.setMessageListener(ml); + + _connection.start(); + + produceMessages(); + _producerSession.commit(); + + // wait for message receiving + ml.awaitForEnd(); + + assertEquals("Received unexpected number of messages!", _messageNumber, ml.getMessageCounter()); + + // assert messages + int counter = 0; + for (Message message : ml.getReceivedMessages()) + { + assertReceivedMessage(message, TEST_MESSAGE_FORMAT, counter++); + } + ml.reset(); + + causeFailure(); + assertFailoverException(); + + + try + { + _consumerSession.commit(); + fail("TransactionRolledBackException should be thrown!"); + } + catch (TransactionRolledBackException e) + { + // that is what is expected + } + + resendMessagesIfNecessary(); + + // wait for message receiving + ml.awaitForEnd(); + + assertEquals("Received unexpected number of messages!", _messageNumber, ml.getMessageCounter()); + + // assert messages + counter = 0; + for (Message message : ml.getReceivedMessages()) + { + assertReceivedMessage(message, TEST_MESSAGE_FORMAT, counter++); + } + + // commit again. It should be successful + _consumerSession.commit(); + } + + /** + * Test that {@link Session#rollback()} does not throw exception after failover + * and that we are able to consume messages. + */ + public void testRollbackAfterFailover() throws Exception + { + init(Session.SESSION_TRANSACTED, true); + + produceMessages(); + _producerSession.commit(); + + consumeMessages(); + + causeFailure(); + + assertFailoverException(); + + _consumerSession.rollback(); + + resendMessagesIfNecessary(); + + // tests whether receiving and committing is working after failover + consumeMessages(); + _consumerSession.commit(); + } + + /** + * Test that {@link Session#rollback()} does not throw exception after receiving further messages + * after failover, and we can receive published messages after rollback. + */ + public void testRollbackAfterReceivingAfterFailover() throws Exception + { + init(Session.SESSION_TRANSACTED, true); + + produceMessages(); + _producerSession.commit(); + + consumeMessages(); + causeFailure(); + + assertFailoverException(); + + resendMessagesIfNecessary(); + + consumeMessages(); + + _consumerSession.rollback(); + + // tests whether receiving and committing is working after failover + consumeMessages(); + _consumerSession.commit(); + } + + /** + * Test that {@link Session#recover()} does not throw an exception after failover + * and that we can consume messages after recover. + */ + public void testRecoverAfterFailover() throws Exception + { + init(Session.CLIENT_ACKNOWLEDGE, true); + + produceMessages(); + + // consume messages but do not acknowledge them + consumeMessages(); + + causeFailure(); + + assertFailoverException(); + + _consumerSession.recover(); + + resendMessagesIfNecessary(); + + // tests whether receiving and acknowledgment is working after recover + Message lastMessage = consumeMessages(); + lastMessage.acknowledge(); + } + + /** + * Test that receiving more messages after failover and then calling + * {@link Session#recover()} does not throw an exception + * and that we can consume messages after recover. + */ + public void testRecoverWithConsumedMessagesAfterFailover() throws Exception + { + init(Session.CLIENT_ACKNOWLEDGE, true); + + produceMessages(); + + // consume messages but do not acknowledge them + consumeMessages(); + + causeFailure(); + + assertFailoverException(); + + // publishing should work after failover + resendMessagesIfNecessary(); + + // consume messages again on a dirty session + consumeMessages(); + + // recover should successfully restore session + _consumerSession.recover(); + + // tests whether receiving and acknowledgment is working after recover + Message lastMessage = consumeMessages(); + lastMessage.acknowledge(); + } + + /** + * Test that first call to {@link Message#acknowledge()} after failover + * throws a JMSEXception if session is dirty. + */ + public void testAcknowledgeAfterFailover() throws Exception + { + init(Session.CLIENT_ACKNOWLEDGE, true); + + produceMessages(); + + // consume messages but do not acknowledge them + Message lastMessage = consumeMessages(); + causeFailure(); + + assertFailoverException(); + + try + { + // an implicit recover performed when acknowledge throws an exception due to failover + lastMessage.acknowledge(); + fail("JMSException should be thrown"); + } + catch (JMSException t) + { + // TODO: assert error code and/or expected exception type + } + + resendMessagesIfNecessary(); + + // tests whether receiving and acknowledgment is working after recover + lastMessage = consumeMessages(); + lastMessage.acknowledge(); + } + + /** + * Test that receiving of messages after failover prior to calling + * {@link Message#acknowledge()} still results in acknowledge throwing an exception. + */ + public void testAcknowledgeAfterMessageReceivingAfterFailover() throws Exception + { + init(Session.CLIENT_ACKNOWLEDGE, true); + + produceMessages(); + + // consume messages but do not acknowledge them + consumeMessages(); + causeFailure(); + + assertFailoverException(); + + resendMessagesIfNecessary(); + + // consume again on dirty session + Message lastMessage = consumeMessages(); + try + { + // an implicit recover performed when acknowledge throws an exception due to failover + lastMessage.acknowledge(); + fail("JMSException should be thrown"); + } + catch (JMSException t) + { + // TODO: assert error code and/or expected exception type + } + + // tests whether receiving and acknowledgment is working on a clean session + lastMessage = consumeMessages(); + lastMessage.acknowledge(); + } + + /** + * Tests that call to {@link Message#acknowledge()} after failover throws an exception in asynchronous consumer + * and we can consume messages after acknowledge. + */ + public void testAcknowledgeAfterFailoverForAsynchronousConsumer() throws Exception + { + init(Session.CLIENT_ACKNOWLEDGE, false); + FailoverTestMessageListener ml = new FailoverTestMessageListener(); + _consumer.setMessageListener(ml); + _connection.start(); + + produceMessages(); + + // wait for message receiving + ml.awaitForEnd(); + + assertEquals("Received unexpected number of messages!", _messageNumber, ml.getMessageCounter()); + + // assert messages + int counter = 0; + Message currentMessage = null; + for (Message message : ml.getReceivedMessages()) + { + assertReceivedMessage(message, TEST_MESSAGE_FORMAT, counter++); + currentMessage = message; + } + ml.reset(); + + causeFailure(); + assertFailoverException(); + + + try + { + currentMessage.acknowledge(); + fail("JMSException should be thrown!"); + } + catch (JMSException e) + { + // TODO: assert error code and/or expected exception type + } + + resendMessagesIfNecessary(); + + // wait for message receiving + ml.awaitForEnd(); + + assertEquals("Received unexpected number of messages!", _messageNumber, ml.getMessageCounter()); + + // assert messages + counter = 0; + for (Message message : ml.getReceivedMessages()) + { + assertReceivedMessage(message, TEST_MESSAGE_FORMAT, counter++); + currentMessage = message; + } + + // acknowledge again. It should be successful + currentMessage.acknowledge(); + } + + /** + * Test whether {@link Session#recover()} works as expected after failover + * in AA mode. + */ + public void testRecoverAfterFailoverInAutoAcknowledgeMode() throws Exception + { + init(Session.AUTO_ACKNOWLEDGE, true); + + produceMessages(); + + // receive first message in order to start a dispatcher thread + Message receivedMessage = _consumer.receive(1000l); + assertReceivedMessage(receivedMessage, TEST_MESSAGE_FORMAT, 0); + + causeFailure(); + + assertFailoverException(); + + _consumerSession.recover(); + + resendMessagesIfNecessary(); + + // tests whether receiving is working after recover + consumeMessages(); + } + + public void testClientAcknowledgedSessionCloseAfterFailover() throws Exception + { + sessionCloseAfterFailoverImpl(Session.CLIENT_ACKNOWLEDGE); + } + + public void testTransactedSessionCloseAfterFailover() throws Exception + { + sessionCloseAfterFailoverImpl(Session.SESSION_TRANSACTED); + } + + public void testAutoAcknowledgedSessionCloseAfterFailover() throws Exception + { + sessionCloseAfterFailoverImpl(Session.AUTO_ACKNOWLEDGE); + } + + /** + * Tests {@link Session#close()} for session with given acknowledge mode + * to ensure that close works after failover. + * + * @param acknowledgeMode session acknowledge mode + * @throws JMSException + */ + private void sessionCloseAfterFailoverImpl(int acknowledgeMode) throws JMSException + { + init(acknowledgeMode, true); + produceMessages(TEST_MESSAGE_FORMAT, _messageNumber, false); + if (acknowledgeMode == Session.SESSION_TRANSACTED) + { + _producerSession.commit(); + } + + // intentionally receive message but do not commit or acknowledge it in + // case of transacted or CLIENT_ACK session + Message receivedMessage = _consumer.receive(1000l); + assertReceivedMessage(receivedMessage, TEST_MESSAGE_FORMAT, 0); + + causeFailure(); + + assertFailoverException(); + + // for transacted/client_ack session + // no exception should be thrown but transaction should be automatically + // rolled back + _consumerSession.close(); + } + + /** + * A helper method to instantiate produce and consumer sessions, producer + * and consumer. + * + * @param acknowledgeMode + * acknowledge mode + * @param startConnection + * indicates whether connection should be started + * @throws JMSException + */ + private void init(int acknowledgeMode, boolean startConnection) throws JMSException + { + boolean isTransacted = acknowledgeMode == Session.SESSION_TRANSACTED ? true : false; + + _consumerSession = _connection.createSession(isTransacted, acknowledgeMode); + _destination = _consumerSession.createQueue(getTestQueueName() + "_" + System.currentTimeMillis()); + _consumer = _consumerSession.createConsumer(_destination); + + if (startConnection) + { + _connection.start(); + } + + _producerSession = _connection.createSession(isTransacted, acknowledgeMode); + _producer = _producerSession.createProducer(_destination); + + } + + /** + * Resends messages if reconnected to a non-clustered broker + * + * @throws JMSException + */ + private void resendMessagesIfNecessary() throws JMSException + { + if (!CLUSTERED) + { + // assert that a new broker does not have messages on a queue + if (_consumer.getMessageListener() == null) + { + Message message = _consumer.receive(100l); + assertNull("Received a message after failover with non-clustered broker!", message); + } + // re-sending messages if reconnected to a non-clustered broker + produceMessages(true); + } + } + + /** + * Produces a default number of messages with default text content into test + * queue + * + * @throws JMSException + */ + private void produceMessages() throws JMSException + { + produceMessages(false); + } + + private void produceMessages(boolean seperateProducer) throws JMSException + { + produceMessages(TEST_MESSAGE_FORMAT, _messageNumber, seperateProducer); + } + + /** + * Consumes a default number of messages and asserts their content. + * + * @return last consumed message + * @throws JMSException + */ + private Message consumeMessages() throws JMSException + { + return consumeMessages(TEST_MESSAGE_FORMAT, _messageNumber); + } + + /** + * Produces given number of text messages with content matching given + * content pattern + * + * @param messagePattern message content pattern + * @param messageNumber number of messages to send + * @param standaloneProducer whether to use the existing producer or a new one. + * @throws JMSException + */ + private void produceMessages(String messagePattern, int messageNumber, boolean standaloneProducer) throws JMSException + { + Session producerSession; + MessageProducer producer; + + if(standaloneProducer) + { + producerSession = _connection.createSession(true, Session.SESSION_TRANSACTED); + producer = producerSession.createProducer(_destination); + } + else + { + producerSession = _producerSession; + producer = _producer; + } + + for (int i = 0; i < messageNumber; i++) + { + String text = MessageFormat.format(messagePattern, i); + Message message = producerSession.createTextMessage(text); + producer.send(message); + } + + if(standaloneProducer) + { + producerSession.commit(); + } + } + + /** + * Consumes given number of text messages and asserts that their content + * matches given pattern + * + * @param messagePattern + * messages content pattern + * @param messageNumber + * message number to received + * @return last consumed message + * @throws JMSException + */ + private Message consumeMessages(String messagePattern, int messageNumber) throws JMSException + { + Message receivedMesssage = null; + for (int i = 0; i < messageNumber; i++) + { + receivedMesssage = _consumer.receive(1000l); + assertReceivedMessage(receivedMesssage, messagePattern, i); + } + return receivedMesssage; + } + + /** + * Asserts received message + * + * @param receivedMessage + * received message + * @param messagePattern + * messages content pattern + * @param messageIndex + * message index + */ + private void assertReceivedMessage(Message receivedMessage, String messagePattern, int messageIndex) + { + assertNotNull("Expected message [" + messageIndex + "] is not received!", receivedMessage); + assertTrue("Failure to receive message [" + messageIndex + "], expected TextMessage but received " + + receivedMessage, receivedMessage instanceof TextMessage); + String expectedText = MessageFormat.format(messagePattern, messageIndex); + String receivedText = null; + try + { + receivedText = ((TextMessage) receivedMessage).getText(); + } + catch (JMSException e) + { + fail("JMSException occured while getting message text:" + e.getMessage()); + } + assertEquals("Failover is broken! Expected [" + expectedText + "] but got [" + receivedText + "]", + expectedText, receivedText); + } + + /** + * Causes failover and waits till connection is re-established. + */ + private void causeFailure() + { + causeFailure(getFailingPort(), DEFAULT_FAILOVER_TIME * 2); + } + + /** + * Causes failover by stopping broker on given port and waits till + * connection is re-established during given time interval. + * + * @param port + * broker port + * @param delay + * time interval to wait for connection re-establishement + */ + private void causeFailure(int port, long delay) + { + failBroker(port); + + awaitForFailoverCompletion(delay); + } + + private void awaitForFailoverCompletion(long delay) + { + _logger.info("Awaiting Failover completion.."); + try + { + if (!_failoverComplete.await(delay, TimeUnit.MILLISECONDS)) + { + fail("Failover did not complete"); + } + } + catch (InterruptedException e) + { + fail("Test was interrupted:" + e.getMessage()); + } + } + + private void assertFailoverException() + { + // TODO: assert exception is received (once implemented) + // along with error code and/or expected exception type + } + + @Override + public void bytesSent(long count) + { + } + + @Override + public void bytesReceived(long count) + { + } + + @Override + public boolean preFailover(boolean redirect) + { + return true; + } + + @Override + public boolean preResubscribe() + { + return true; + } + + @Override + public void failoverComplete() + { + _failoverComplete.countDown(); + } + + @Override + public void onException(JMSException e) + { + _exceptionListenerException = e; + } + + private class FailoverTestMessageListener implements MessageListener + { + // message counter + private AtomicInteger _counter = new AtomicInteger(); + + private List _receivedMessage = new ArrayList(); + + private volatile CountDownLatch _endLatch; + + public FailoverTestMessageListener() throws JMSException + { + _endLatch = new CountDownLatch(1); + } + + @Override + public void onMessage(Message message) + { + _receivedMessage.add(message); + if (_counter.incrementAndGet() % _messageNumber == 0) + { + _endLatch.countDown(); + } + } + + public void reset() + { + _receivedMessage.clear(); + _endLatch = new CountDownLatch(1); + _counter.set(0); + } + + public List getReceivedMessages() + { + return _receivedMessage; + } + + public Object awaitForEnd() throws InterruptedException + { + return _endLatch.await((long) _messageNumber, TimeUnit.SECONDS); + } + + public int getMessageCounter() + { + return _counter.get(); + } + } +} -- cgit v1.2.1