diff options
Diffstat (limited to 'trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java')
-rw-r--r-- | trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java | 338 |
1 files changed, 0 insertions, 338 deletions
diff --git a/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java b/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java deleted file mode 100644 index 863aa43d22..0000000000 --- a/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java +++ /dev/null @@ -1,338 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.failover; - -import org.apache.mina.common.WriteTimeoutException; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.protocol.AMQProtocolSession; -import org.apache.qpid.jms.ConnectionListener; -import org.apache.qpid.test.utils.QpidTestCase; -import org.apache.qpid.test.utils.FailoverBaseCase; -import org.apache.qpid.AMQConnectionClosedException; - -import javax.jms.Destination; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.Session; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -/** - * Test case based on user reported error. - * - * Summary: - * A user has reported message loss from their application. On bouncing of - * the broker the 'lost' messages are delivered to the broker. - * - * Note: - * The client was using Spring so that may influence the situation. - * - * Issue: - * The log files show 7 instances of the following which result in 7 - * missing messages. - * - * The client log files show: - * - * The broker log file show: - * - * - * 7 missing messages have delivery tags 5-11. Which says that they are - * sequentially the next message from the broker. - * - * The only way for the 'without a handler' log to occur is if the consumer - * has been removed from the look up table of the dispatcher. - * And the only way for the 'null message' log to occur on the broker is is - * if the message does not exist in the unacked-map - * - * The consumer is only removed from the list during session - * closure and failover. - * - * If the session was closed then the broker would requeue the unacked - * messages so the potential exists to have an empty map but the broker - * will not send a message out after the unacked map has been cleared. - * - * When failover occurs the _consumer map is cleared and the consumers are - * resubscribed. This is down without first stopping any existing - * dispatcher so there exists the potential to receive a message after - * the _consumer map has been cleared which is how the 'without a handler' - * log statement occurs. - * - * Scenario: - * - * Looking over logs the sequence that best fits the events is as follows: - * - Something causes Mina to be delayed causing the WriteTimoutException. - * - This exception is recevied by AMQProtocolHandler#exceptionCaught - * - As the WriteTimeoutException is an IOException this will cause - * sessionClosed to be called to start failover. - * + This is potentially the issues here. All IOExceptions are treated - * as connection failure events. - * - Failover Runs - * + Failover assumes that the previous connection has been closed. - * + Failover binds the existing objects (AMQConnection/Session) to the - * new connection objects. - * - Everything is reported as being successfully failed over. - * However, what is neglected is that the original connection has not - * been closed. - * + So what occurs is that the broker sends a message to the consumer on - * the original connection, as it was not notified of the client - * failing over. - * As the client failover reuses the original AMQSession and Dispatcher - * the new messages the broker sends to the old consumer arrives at the - * client and is processed by the same AMQSession and Dispatcher. - * However, as the failover process cleared the _consumer map and - * resubscribe the consumers the Dispatcher does not recognise the - * delivery tag and so logs the 'without a handler' message. - * - The Dispatcher then attempts to reject the message, however, - * + The AMQSession/Dispatcher pair have been swapped to using a new Mina - * ProtocolSession as part of the failover process so the reject is - * sent down the second connection. The broker receives the Reject - * request but as the Message was sent on a different connection the - * unacknowledgemap is empty and a 'message is null' log message - * produced. - * - * Test Strategy: - * - * It should be easy to demonstrate if we can send an IOException to - * AMQProtocolHandler#exceptionCaught and then try sending a message. - * - * The current unknowns here are the type of consumers that are in use. - * If it was an exclusive queue(Durable Subscription) then why did the - * resubscribe not fail. - * - * If it was not exclusive then why did the messages not round robin? - */ -public class MessageDisappearWithIOExceptionTest extends FailoverBaseCase implements ConnectionListener -{ - private CountDownLatch _failoverOccured = new CountDownLatch(1); - AMQConnection _connection; - Session _session; - Queue _queue; - MessageConsumer _consumer; - - public void setUp() throws Exception - { - super.setUp(); - stopBroker(getFailingPort()); - - } - - /** - * Test Summary: - * - * Create a queue consumer and send 10 messages to the broker. - * - * Consume the first message. - * This will pull the rest into the prefetch - * - * Send an IOException to the MinaProtocolHandler. - * - * This will force failover to occur. - * - * 9 messages would normally be expected but it is expected that none will - * arrive. As they are still in the prefetch of the first session. - * - * To free the messages we need to close all connections. - * - Simply doing connection.close() and retesting will not be enough as - * the original connection's IO layer will still exist and is nolonger - * connected to the connection object as a result of failover. - * - * - Test will need to retain a reference to the original connection IO so - * that it can be closed releasing the messages to validate that the - * messages have indeed been 'lost' on that sesssion. - */ - public void test() throws Exception - { - initialiseConnection(); - - // Create Producer - // Send 10 messages - List<Message> messages = sendNumberedBytesMessage(_session, _queue, 10); - - // Consume first messasge - Message received = _consumer.receive(2000); - - // Verify received messages - assertNotNull("First message not received.", received); - assertEquals("Incorrect message Received", - messages.remove(0).getIntProperty("count"), - received.getIntProperty("count")); - - // When the Exception is received by the underlying IO layer it will - // initiate failover. The first step of which is to ensure that the - // existing conection is closed. So in this situation the connection - // will be flushed casuing the above ACK to be sent to the broker. - // - // That said: - // when the socket close is detected on the server it will rise up the - // Mina filter chain and interrupt processing. - // this has been raised as QPID-2138 - _session.createConsumer(_session.createTemporaryQueue()).close(); - - //Retain IO Layer - AMQProtocolSession protocolSession = _connection.getProtocolHandler().getProtocolSession(); - - // Send IO Exception - causing failover - _connection.getProtocolHandler(). - exception(new WriteTimeoutException("WriteTimeoutException to cause failover.")); - - // Verify Failover occured through ConnectionListener - assertTrue("Failover did not occur", - _failoverOccured.await(4000, TimeUnit.MILLISECONDS)); - - /***********************************/ - // This verifies that the bug has been resolved - - // Attempt to consume again. Expect 9 messages - for (int count = 1; count < 10; count++) - { - received = _consumer.receive(2000); - assertNotNull("Expected message not received:" + count, received); - assertEquals(messages.remove(0).getIntProperty("count"), - received.getIntProperty("count")); - } - - //Verify there are no more messages - received = _consumer.receive(1000); - assertNull("Message receieved when there should be none:" + received, - received); - -// /***********************************/ -// // This verifies that the bug exists -// -// // Attempt to consume remaining 9 messages.. Expecting NONE. -// // receiving just one message should fail so no need to fail 9 times -// received = _consumer.receive(1000); -// assertNull("Message receieved when it should be null:" + received, received); -// -//// //Close the Connection which you would assume would free the messages -//// _connection.close(); -//// -//// // Reconnect -//// initialiseConnection(); -//// -//// // We should still be unable to receive messages -//// received = _consumer.receive(1000); -//// assertNull("Message receieved when it should be null:" + received, received); -//// -//// _connection.close(); -// -// // Close original IO layer. Expecting messages to be released -// protocolSession.closeProtocolSession(); -// -// // Reconnect and all should be good. -//// initialiseConnection(); -// -// // Attempt to consume again. Expect 9 messages -// for (int count = 1; count < 10; count++) -// { -// received = _consumer.receive(2000); -// assertNotNull("Expected message not received:" + count, received); -// assertEquals(messages.remove(0).getIntProperty("count"), -// received.getIntProperty("count")); -// } -// -// //Verify there are no more messages -// received = _consumer.receive(1000); -// assertNull("Message receieved when there should be none:" + received, -// received); - } - - private void initialiseConnection() - throws Exception - { - //Create Connection using the default connection URL. i.e. not the Failover URL that would be used by default - _connection = (AMQConnection) getConnection(getConnectionFactory("default").getConnectionURL()); - // The default connection does not have any retries configured so - // Allow this connection to retry so that we can block on the failover. - // The alternative would be to use the getConnection() default. However, - // this would add additional complexity in the logging as a second - // broker is defined in that url. We do not need it for this test. - _connection.getFailoverPolicy().getCurrentMethod().setRetries(1); - _connection.setConnectionListener(this); - - _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - _queue = _session.createQueue(getTestQueueName()); - - // Create Consumer - _consumer = _session.createConsumer(_queue); - - //Start connection - _connection.start(); - } - - /** QpidTestCase back port to this release */ - - // modified from QTC as sendMessage is not testable. - // - should be renamed sendBlankBytesMessage - // - should be renamed sendNumberedBytesMessage - public List<Message> sendNumberedBytesMessage(Session session, Destination destination, - int count) throws Exception - { - List<Message> messages = new ArrayList<Message>(count); - - MessageProducer producer = session.createProducer(destination); - - for (int i = 0; i < count; i++) - { - Message next = session.createMessage(); - - next.setIntProperty("count", i); - - producer.send(next); - - messages.add(next); - } - - producer.close(); - return messages; - } - - public void bytesSent(long count) - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public void bytesReceived(long count) - { - } - - public boolean preFailover(boolean redirect) - { - //Allow failover to occur - return true; - } - - public boolean preResubscribe() - { - //Allow failover to occur - return true; - } - - public void failoverComplete() - { - _failoverOccured.countDown(); - } -} |