summaryrefslogtreecommitdiff
path: root/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java')
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java338
1 files changed, 338 insertions, 0 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java
new file mode 100644
index 0000000000..4c2758241e
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java
@@ -0,0 +1,338 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.failover;
+
+import org.apache.mina.common.WriteTimeoutException;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
+import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.test.utils.FailoverBaseCase;
+import org.apache.qpid.AMQConnectionClosedException;
+
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Test case based on user reported error.
+ *
+ * Summary:
+ * A user has reported message loss from their application. On bouncing of
+ * the broker the 'lost' messages are delivered to the broker.
+ *
+ * Note:
+ * The client was using Spring so that may influence the situation.
+ *
+ * Issue:
+ * The log files show 7 instances of the following which result in 7
+ * missing messages.
+ *
+ * The client log files show:
+ *
+ * The broker log file show:
+ *
+ *
+ * 7 missing messages have delivery tags 5-11. Which says that they are
+ * sequentially the next message from the broker.
+ *
+ * The only way for the 'without a handler' log to occur is if the consumer
+ * has been removed from the look up table of the dispatcher.
+ * And the only way for the 'null message' log to occur on the broker is is
+ * if the message does not exist in the unacked-map
+ *
+ * The consumer is only removed from the list during session
+ * closure and failover.
+ *
+ * If the session was closed then the broker would requeue the unacked
+ * messages so the potential exists to have an empty map but the broker
+ * will not send a message out after the unacked map has been cleared.
+ *
+ * When failover occurs the _consumer map is cleared and the consumers are
+ * resubscribed. This is down without first stopping any existing
+ * dispatcher so there exists the potential to receive a message after
+ * the _consumer map has been cleared which is how the 'without a handler'
+ * log statement occurs.
+ *
+ * Scenario:
+ *
+ * Looking over logs the sequence that best fits the events is as follows:
+ * - Something causes Mina to be delayed causing the WriteTimoutException.
+ * - This exception is recevied by AMQProtocolHandler#exceptionCaught
+ * - As the WriteTimeoutException is an IOException this will cause
+ * sessionClosed to be called to start failover.
+ * + This is potentially the issues here. All IOExceptions are treated
+ * as connection failure events.
+ * - Failover Runs
+ * + Failover assumes that the previous connection has been closed.
+ * + Failover binds the existing objects (AMQConnection/Session) to the
+ * new connection objects.
+ * - Everything is reported as being successfully failed over.
+ * However, what is neglected is that the original connection has not
+ * been closed.
+ * + So what occurs is that the broker sends a message to the consumer on
+ * the original connection, as it was not notified of the client
+ * failing over.
+ * As the client failover reuses the original AMQSession and Dispatcher
+ * the new messages the broker sends to the old consumer arrives at the
+ * client and is processed by the same AMQSession and Dispatcher.
+ * However, as the failover process cleared the _consumer map and
+ * resubscribe the consumers the Dispatcher does not recognise the
+ * delivery tag and so logs the 'without a handler' message.
+ * - The Dispatcher then attempts to reject the message, however,
+ * + The AMQSession/Dispatcher pair have been swapped to using a new Mina
+ * ProtocolSession as part of the failover process so the reject is
+ * sent down the second connection. The broker receives the Reject
+ * request but as the Message was sent on a different connection the
+ * unacknowledgemap is empty and a 'message is null' log message
+ * produced.
+ *
+ * Test Strategy:
+ *
+ * It should be easy to demonstrate if we can send an IOException to
+ * AMQProtocolHandler#exceptionCaught and then try sending a message.
+ *
+ * The current unknowns here are the type of consumers that are in use.
+ * If it was an exclusive queue(Durable Subscription) then why did the
+ * resubscribe not fail.
+ *
+ * If it was not exclusive then why did the messages not round robin?
+ */
+public class MessageDisappearWithIOExceptionTest extends FailoverBaseCase implements ConnectionListener
+{
+ private CountDownLatch _failoverOccured = new CountDownLatch(1);
+ AMQConnection _connection;
+ Session _session;
+ Queue _queue;
+ MessageConsumer _consumer;
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ stopBroker(getFailingPort());
+
+ }
+
+ /**
+ * Test Summary:
+ *
+ * Create a queue consumer and send 10 messages to the broker.
+ *
+ * Consume the first message.
+ * This will pull the rest into the prefetch
+ *
+ * Send an IOException to the MinaProtocolHandler.
+ *
+ * This will force failover to occur.
+ *
+ * 9 messages would normally be expected but it is expected that none will
+ * arrive. As they are still in the prefetch of the first session.
+ *
+ * To free the messages we need to close all connections.
+ * - Simply doing connection.close() and retesting will not be enough as
+ * the original connection's IO layer will still exist and is nolonger
+ * connected to the connection object as a result of failover.
+ *
+ * - Test will need to retain a reference to the original connection IO so
+ * that it can be closed releasing the messages to validate that the
+ * messages have indeed been 'lost' on that sesssion.
+ */
+ public void test() throws Exception
+ {
+ initialiseConnection();
+
+ // Create Producer
+ // Send 10 messages
+ List<Message> messages = sendNumberedBytesMessage(_session, _queue, 10);
+
+ // Consume first messasge
+ Message received = _consumer.receive(2000);
+
+ // Verify received messages
+ assertNotNull("First message not received.", received);
+ assertEquals("Incorrect message Received",
+ messages.remove(0).getIntProperty("count"),
+ received.getIntProperty("count"));
+
+ // When the Exception is received by the underlying IO layer it will
+ // initiate failover. The first step of which is to ensure that the
+ // existing conection is closed. So in this situation the connection
+ // will be flushed casuing the above ACK to be sent to the broker.
+ //
+ // That said:
+ // when the socket close is detected on the server it will rise up the
+ // Mina filter chain and interrupt processing.
+ // this has been raised as QPID-2138
+ _session.createConsumer(_session.createTemporaryQueue()).close();
+
+ //Retain IO Layer
+ AMQProtocolSession protocolSession = _connection.getProtocolHandler().getProtocolSession();
+
+ // Send IO Exception - causing failover
+ _connection.getProtocolHandler().
+ exception(new WriteTimeoutException("WriteTimeoutException to cause failover."));
+
+ // Verify Failover occured through ConnectionListener
+ assertTrue("Failover did not occur",
+ _failoverOccured.await(4000, TimeUnit.MILLISECONDS));
+
+ /***********************************/
+ // This verifies that the bug has been resolved
+
+ // Attempt to consume again. Expect 9 messages
+ for (int count = 1; count < 10; count++)
+ {
+ received = _consumer.receive(2000);
+ assertNotNull("Expected message not received:" + count, received);
+ assertEquals(messages.remove(0).getIntProperty("count"),
+ received.getIntProperty("count"));
+ }
+
+ //Verify there are no more messages
+ received = _consumer.receive(1000);
+ assertNull("Message receieved when there should be none:" + received,
+ received);
+
+// /***********************************/
+// // This verifies that the bug exists
+//
+// // Attempt to consume remaining 9 messages.. Expecting NONE.
+// // receiving just one message should fail so no need to fail 9 times
+// received = _consumer.receive(1000);
+// assertNull("Message receieved when it should be null:" + received, received);
+//
+//// //Close the Connection which you would assume would free the messages
+//// _connection.close();
+////
+//// // Reconnect
+//// initialiseConnection();
+////
+//// // We should still be unable to receive messages
+//// received = _consumer.receive(1000);
+//// assertNull("Message receieved when it should be null:" + received, received);
+////
+//// _connection.close();
+//
+// // Close original IO layer. Expecting messages to be released
+// protocolSession.closeProtocolSession();
+//
+// // Reconnect and all should be good.
+//// initialiseConnection();
+//
+// // Attempt to consume again. Expect 9 messages
+// for (int count = 1; count < 10; count++)
+// {
+// received = _consumer.receive(2000);
+// assertNotNull("Expected message not received:" + count, received);
+// assertEquals(messages.remove(0).getIntProperty("count"),
+// received.getIntProperty("count"));
+// }
+//
+// //Verify there are no more messages
+// received = _consumer.receive(1000);
+// assertNull("Message receieved when there should be none:" + received,
+// received);
+ }
+
+ private void initialiseConnection()
+ throws Exception
+ {
+ //Create Connection using the default connection URL. i.e. not the Failover URL that would be used by default
+ _connection = (AMQConnection) getConnectionFactory("default").createConnection("guest", "guest");
+ // The default connection does not have any retries configured so
+ // Allow this connection to retry so that we can block on the failover.
+ // The alternative would be to use the getConnection() default. However,
+ // this would add additional complexity in the logging as a second
+ // broker is defined in that url. We do not need it for this test.
+ _connection.getFailoverPolicy().getCurrentMethod().setRetries(1);
+ _connection.setConnectionListener(this);
+
+ _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ _queue = _session.createQueue(getTestQueueName());
+
+ // Create Consumer
+ _consumer = _session.createConsumer(_queue);
+
+ //Start connection
+ _connection.start();
+ }
+
+ /** QpidTestCase back port to this release */
+
+ // modified from QTC as sendMessage is not testable.
+ // - should be renamed sendBlankBytesMessage
+ // - should be renamed sendNumberedBytesMessage
+ public List<Message> sendNumberedBytesMessage(Session session, Destination destination,
+ int count) throws Exception
+ {
+ List<Message> messages = new ArrayList<Message>(count);
+
+ MessageProducer producer = session.createProducer(destination);
+
+ for (int i = 0; i < count; i++)
+ {
+ Message next = session.createMessage();
+
+ next.setIntProperty("count", i);
+
+ producer.send(next);
+
+ messages.add(next);
+ }
+
+ producer.close();
+ return messages;
+ }
+
+ public void bytesSent(long count)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void bytesReceived(long count)
+ {
+ }
+
+ public boolean preFailover(boolean redirect)
+ {
+ //Allow failover to occur
+ return true;
+ }
+
+ public boolean preResubscribe()
+ {
+ //Allow failover to occur
+ return true;
+ }
+
+ public void failoverComplete()
+ {
+ _failoverOccured.countDown();
+ }
+}