summaryrefslogtreecommitdiff
path: root/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java')
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java256
1 files changed, 190 insertions, 66 deletions
diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java
index bf87e8e84f..c307176f3f 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java
@@ -21,65 +21,70 @@
package org.apache.qpid.test.client.failover;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQConnectionFactory;
-import org.apache.qpid.client.AMQConnectionURL;
-import org.apache.qpid.client.transport.TransportConnection;
-import org.apache.qpid.jms.ConnectionListener;
-import org.apache.qpid.jms.ConnectionURL;
-import org.apache.qpid.jms.BrokerDetails;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.test.utils.FailoverBaseCase;
-import org.apache.log4j.Logger;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
+import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
-import javax.jms.Queue;
import javax.naming.NamingException;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.jms.BrokerDetails;
+import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.test.utils.FailoverBaseCase;
public class FailoverTest extends FailoverBaseCase implements ConnectionListener
{
private static final Logger _logger = Logger.getLogger(FailoverTest.class);
private static final String QUEUE = "queue";
- private static final int NUM_MESSAGES = 10;
- private Connection connnection;
+ private static final int DEFAULT_NUM_MESSAGES = 10;
+ private static final int DEFAULT_SEED = 20080921;
+ protected int numMessages = 0;
+ protected Connection connection;
private Session producerSession;
private Queue queue;
private MessageProducer producer;
private Session consumerSession;
private MessageConsumer consumer;
- private static int usedBrokers = 0;
private CountDownLatch failoverComplete;
- private static final long DEFAULT_FAILOVER_TIME = 10000L;
+ private boolean CLUSTERED = Boolean.getBoolean("profile.clustered");
+ private int seed;
+ private Random rand;
+ private int _currentPort = getFailingPort();
@Override
protected void setUp() throws Exception
{
super.setUp();
-
- connnection = getConnection();
- ((AMQConnection) connnection).setConnectionListener(this);
- connnection.start();
+
+ numMessages = Integer.getInteger("profile.failoverMsgCount",DEFAULT_NUM_MESSAGES);
+ seed = Integer.getInteger("profile.failoverRandomSeed",DEFAULT_SEED);
+ rand = new Random(seed);
+
+ connection = getConnection();
+ ((AMQConnection) connection).setConnectionListener(this);
+ connection.start();
failoverComplete = new CountDownLatch(1);
}
- private void init(boolean transacted, int mode) throws JMSException, NamingException
+ protected void init(boolean transacted, int mode) throws JMSException, NamingException
{
- queue = (Queue) getInitialContext().lookup(QUEUE);
-
- consumerSession = connnection.createSession(transacted, mode);
+ consumerSession = connection.createSession(transacted, mode);
+ queue = consumerSession.createQueue(getName()+System.currentTimeMillis());
consumer = consumerSession.createConsumer(queue);
- producerSession = connnection.createSession(transacted, mode);
+ producerSession = connection.createSession(transacted, mode);
producer = producerSession.createProducer(queue);
}
@@ -88,7 +93,7 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener
{
try
{
- connnection.close();
+ connection.close();
}
catch (Exception e)
{
@@ -98,26 +103,46 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener
super.tearDown();
}
- private void consumeMessages(int toConsume, boolean transacted) throws JMSException
+ private void consumeMessages(int startIndex,int endIndex, boolean transacted) throws JMSException
{
Message msg;
- for (int i = 0; i < toConsume; i++)
+ _logger.debug("**************** Receive (Start: " + startIndex + ", End:" + endIndex + ")***********************");
+
+ for (int i = startIndex; i < endIndex; i++)
{
- msg = consumer.receive(1000);
+ msg = consumer.receive(1000);
assertNotNull("Message " + i + " was null!", msg);
- assertEquals("message " + i, ((TextMessage) msg).getText());
+
+ _logger.debug("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@");
+ _logger.debug("Received : " + ((TextMessage) msg).getText());
+ _logger.debug("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@");
+
+ assertEquals("Invalid message order","message " + i, ((TextMessage) msg).getText());
+
}
- if (transacted) {
+ _logger.debug("***********************************************************");
+
+ if (transacted)
+ {
consumerSession.commit();
}
}
- private void sendMessages(int totalMessages, boolean transacted) throws JMSException
+ private void sendMessages(int startIndex,int endIndex, boolean transacted) throws JMSException
{
- for (int i = 0; i < totalMessages; i++)
- {
+ _logger.debug("**************** Send (Start: " + startIndex + ", End:" + endIndex + ")***********************");
+
+ for (int i = startIndex; i < endIndex; i++)
+ {
producer.send(producerSession.createTextMessage("message " + i));
+
+ _logger.debug("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@");
+ _logger.debug("Sending message"+i);
+ _logger.debug("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@");
}
+
+ _logger.debug("***********************************************************");
+
if (transacted)
{
producerSession.commit();
@@ -126,70 +151,122 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener
public void testP2PFailover() throws Exception
{
- testP2PFailover(NUM_MESSAGES, true, false);
+ testP2PFailover(numMessages, true,true, false);
}
- public void testP2PFailoverWithMessagesLeft() throws Exception
+ public void testP2PFailoverWithMessagesLeftToConsumeAndProduce() throws Exception
{
- testP2PFailover(NUM_MESSAGES, false, false);
+ if (CLUSTERED)
+ {
+ testP2PFailover(numMessages, false,false, false);
+ }
}
-
+
+ public void testP2PFailoverWithMessagesLeftToConsume() throws Exception
+ {
+ if (CLUSTERED)
+ {
+ testP2PFailover(numMessages, false,true, false);
+ }
+ }
+
public void testP2PFailoverTransacted() throws Exception
{
- testP2PFailover(NUM_MESSAGES, true, false);
+ testP2PFailover(numMessages, true,true, false);
}
- private void testP2PFailover(int totalMessages, boolean consumeAll, boolean transacted) throws JMSException, NamingException
+ public void testP2PFailoverTransactedWithMessagesLeftToConsumeAndProduce() throws Exception
{
- Message msg = null;
+ // Currently the cluster does not support transactions that span a failover
+ if (CLUSTERED)
+ {
+ testP2PFailover(numMessages, false,false, false);
+ }
+ }
+
+ private void testP2PFailover(int totalMessages, boolean consumeAll, boolean produceAll , boolean transacted) throws JMSException, NamingException
+ {
init(transacted, Session.AUTO_ACKNOWLEDGE);
- sendMessages(totalMessages, transacted);
+ runP2PFailover(totalMessages,consumeAll, produceAll , transacted);
+ }
+
+ protected void runP2PFailover(int totalMessages, boolean consumeAll, boolean produceAll , boolean transacted) throws JMSException, NamingException
+ {
+ Message msg = null;
+ int toProduce = totalMessages;
+
+ _logger.debug("===================================================================");
+ _logger.debug("Total messages used for the test " + totalMessages + " messages");
+ _logger.debug("===================================================================");
+
+ if (!produceAll)
+ {
+ toProduce = totalMessages - rand.nextInt(totalMessages);
+ }
+
+ _logger.debug("==================");
+ _logger.debug("Sending " + toProduce + " messages");
+ _logger.debug("==================");
+
+ sendMessages(0,toProduce, transacted);
// Consume some messages
- int toConsume = totalMessages;
+ int toConsume = toProduce;
if (!consumeAll)
{
- toConsume = totalMessages / 2;
+ toConsume = toProduce - rand.nextInt(toProduce);
}
+
+ consumeMessages(0,toConsume, transacted);
- consumeMessages(toConsume, transacted);
-
+ _logger.debug("==================");
+ _logger.debug("Consuming " + toConsume + " messages");
+ _logger.debug("==================");
+
_logger.info("Failing over");
- causeFailure(DEFAULT_FAILOVER_TIME);
-
- msg = consumer.receive(500);
-
- assertNull("Should not have received message from new broker!", msg);
- // Check that messages still sent / received
- sendMessages(totalMessages, transacted);
- consumeMessages(totalMessages, transacted);
+ causeFailure(_currentPort, DEFAULT_FAILOVER_TIME);
+
+ // Check that you produce and consume the rest of messages.
+ _logger.debug("==================");
+ _logger.debug("Sending " + (totalMessages-toProduce) + " messages");
+ _logger.debug("==================");
+
+ sendMessages(toProduce,totalMessages, transacted);
+ consumeMessages(toConsume,totalMessages, transacted);
+
+ _logger.debug("==================");
+ _logger.debug("Consuming " + (totalMessages-toConsume) + " messages");
+ _logger.debug("==================");
}
- private void causeFailure(long delay)
+ private void causeFailure(int port, long delay)
{
- failBroker();
+ failBroker(port);
_logger.info("Awaiting Failover completion");
try
{
- failoverComplete.await(delay, TimeUnit.MILLISECONDS);
+ if (!failoverComplete.await(delay, TimeUnit.MILLISECONDS))
+ {
+ fail("failover did not complete");
+ }
}
catch (InterruptedException e)
{
//evil ignore IE.
}
}
-
+
public void testClientAckFailover() throws Exception
{
init(false, Session.CLIENT_ACKNOWLEDGE);
- sendMessages(1, false);
+ sendMessages(0,1, false);
Message msg = consumer.receive();
assertNotNull("Expected msgs not received", msg);
- causeFailure(DEFAULT_FAILOVER_TIME);
+ causeFailure(getFailingPort(), DEFAULT_FAILOVER_TIME);
Exception failure = null;
try
@@ -201,7 +278,7 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener
failure = e;
}
assertNotNull("Exception should be thrown", failure);
- }
+ }
/**
* The client used to have a fixed timeout of 4 minutes after which failover would no longer work.
@@ -209,6 +286,7 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener
*
* @throws Exception if something unexpected occurs in the test.
*/
+
public void test4MinuteFailover() throws Exception
{
ConnectionURL connectionURL = getConnectionFactory().getConnectionURL();
@@ -221,12 +299,12 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener
details.setProperty(BrokerDetails.OPTIONS_RETRY, String.valueOf(RETRIES));
details.setProperty(BrokerDetails.OPTIONS_CONNECT_DELAY, String.valueOf(DELAY));
- connnection = new AMQConnection(connectionURL, null);
+ connection = new AMQConnection(connectionURL, null);
- ((AMQConnection) connnection).setConnectionListener(this);
+ ((AMQConnection) connection).setConnectionListener(this);
//Start the connection
- connnection.start();
+ connection.start();
long FAILOVER_DELAY = (RETRIES * DELAY);
@@ -234,12 +312,58 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener
long failTime = System.nanoTime() + FAILOVER_DELAY * 1000000;
//Fail the first broker
- causeFailure(FAILOVER_DELAY + DEFAULT_FAILOVER_TIME);
+ causeFailure(getFailingPort(), FAILOVER_DELAY + DEFAULT_FAILOVER_TIME);
//Reconnection should occur
assertTrue("Failover did not take long enough", System.nanoTime() > failTime);
}
+
+ /**
+ * The idea is to run a failover test in a loop by failing over
+ * to the other broker each time.
+ */
+ public void testFailoverInALoop() throws Exception
+ {
+ if (!CLUSTERED)
+ {
+ return;
+ }
+
+ int iterations = Integer.getInteger("profile.failoverIterations",0);
+ boolean useAltPort = false;
+ int altPort = FAILING_PORT;
+ int stdPort = DEFAULT_PORT;
+ init(false, Session.AUTO_ACKNOWLEDGE);
+ for (int i=0; i < iterations; i++)
+ {
+ _logger.debug("===================================================================");
+ _logger.debug("Failover In a loop : iteration number " + i);
+ _logger.debug("===================================================================");
+
+ runP2PFailover(numMessages, false,false, false);
+ startBroker(_currentPort);
+ if (useAltPort)
+ {
+ _currentPort = altPort;
+ useAltPort = false;
+ }
+ else
+ {
+ _currentPort = stdPort;
+ useAltPort = true;
+ }
+
+ }
+ //To prevent any failover logic being initiated when we shutdown the brokers.
+ connection.close();
+
+ // Shutdown the brokers
+ stopBroker(altPort);
+ stopBroker(stdPort);
+
+ }
+
public void bytesSent(long count)
{
}