summaryrefslogtreecommitdiff
path: root/qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java')
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java237
1 files changed, 178 insertions, 59 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java
index 3a1fb50725..b7ae911a49 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java
@@ -21,36 +21,37 @@
package org.apache.qpid.test.client.failover;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQConnectionFactory;
-import org.apache.qpid.client.AMQConnectionURL;
-import org.apache.qpid.client.transport.TransportConnection;
-import org.apache.qpid.jms.ConnectionListener;
-import org.apache.qpid.jms.ConnectionURL;
-import org.apache.qpid.jms.BrokerDetails;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.test.utils.FailoverBaseCase;
-import org.apache.log4j.Logger;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
+import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
-import javax.jms.Queue;
import javax.naming.NamingException;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQSession_0_10;
+import org.apache.qpid.jms.BrokerDetails;
+import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.test.utils.FailoverBaseCase;
public class FailoverTest extends FailoverBaseCase implements ConnectionListener
{
private static final Logger _logger = Logger.getLogger(FailoverTest.class);
private static final String QUEUE = "queue";
- private static final int NUM_MESSAGES = 10;
- private Connection connnection;
+ private static final int DEFAULT_NUM_MESSAGES = 10;
+ private static final int DEFAULT_SEED = 20080921;
+ protected int numMessages = 0;
+ protected Connection connection;
private Session producerSession;
private Queue queue;
private MessageProducer producer;
@@ -61,26 +62,32 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener
private CountDownLatch failoverComplete;
private static final long DEFAULT_FAILOVER_TIME = 10000L;
private boolean CLUSTERED = Boolean.getBoolean("profile.clustered");
-
+ private int seed;
+ private Random rand;
+
@Override
protected void setUp() throws Exception
{
super.setUp();
-
- connnection = getConnection();
- ((AMQConnection) connnection).setConnectionListener(this);
- connnection.start();
+
+ numMessages = Integer.getInteger("profile.failoverMsgCount",DEFAULT_NUM_MESSAGES);
+ seed = Integer.getInteger("profile.failoverRandomSeed",DEFAULT_SEED);
+ rand = new Random(seed);
+
+ connection = getConnection();
+ ((AMQConnection) connection).setConnectionListener(this);
+ connection.start();
failoverComplete = new CountDownLatch(1);
}
- private void init(boolean transacted, int mode) throws JMSException, NamingException
+ protected void init(boolean transacted, int mode) throws JMSException, NamingException
{
queue = (Queue) getInitialContext().lookup(QUEUE);
- consumerSession = connnection.createSession(transacted, mode);
+ consumerSession = connection.createSession(transacted, mode);
consumer = consumerSession.createConsumer(queue);
- producerSession = connnection.createSession(transacted, mode);
+ producerSession = connection.createSession(transacted, mode);
producer = producerSession.createProducer(queue);
}
@@ -89,7 +96,7 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener
{
try
{
- connnection.close();
+ connection.close();
}
catch (Exception e)
{
@@ -99,26 +106,46 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener
super.tearDown();
}
- private void consumeMessages(int toConsume, boolean transacted) throws JMSException
+ private void consumeMessages(int startIndex,int endIndex, boolean transacted) throws JMSException
{
Message msg;
- for (int i = 0; i < toConsume; i++)
+ _logger.debug("**************** Receive (Start: " + startIndex + ", End:" + endIndex + ")***********************");
+
+ for (int i = startIndex; i < endIndex; i++)
{
- msg = consumer.receive(1000);
+ msg = consumer.receive(1000);
assertNotNull("Message " + i + " was null!", msg);
- assertEquals("message " + i, ((TextMessage) msg).getText());
+
+ _logger.debug("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@");
+ _logger.debug("Received : " + ((TextMessage) msg).getText());
+ _logger.debug("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@");
+
+ assertEquals("Invalid message order","message " + i, ((TextMessage) msg).getText());
+
}
- if (transacted) {
+ _logger.debug("***********************************************************");
+
+ if (transacted)
+ {
consumerSession.commit();
}
}
- private void sendMessages(int totalMessages, boolean transacted) throws JMSException
+ private void sendMessages(int startIndex,int endIndex, boolean transacted) throws JMSException
{
- for (int i = 0; i < totalMessages; i++)
- {
+ _logger.debug("**************** Send (Start: " + startIndex + ", End:" + endIndex + ")***********************");
+
+ for (int i = startIndex; i < endIndex; i++)
+ {
producer.send(producerSession.createTextMessage("message " + i));
+
+ _logger.debug("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@");
+ _logger.debug("Sending message"+i);
+ _logger.debug("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@");
}
+
+ _logger.debug("***********************************************************");
+
if (transacted)
{
producerSession.commit();
@@ -127,47 +154,93 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener
public void testP2PFailover() throws Exception
{
- testP2PFailover(NUM_MESSAGES, true, false);
+ testP2PFailover(numMessages, true,true, false);
}
- public void testP2PFailoverWithMessagesLeft() throws Exception
+ public void testP2PFailoverWithMessagesLeftToConsumeAndProduce() throws Exception
{
- testP2PFailover(NUM_MESSAGES, false, false);
+ if (CLUSTERED)
+ {
+ testP2PFailover(numMessages, false,false, false);
+ }
}
-
+
+ public void testP2PFailoverWithMessagesLeftToConsume() throws Exception
+ {
+ if (CLUSTERED)
+ {
+ testP2PFailover(numMessages, false,true, false);
+ }
+ }
+
public void testP2PFailoverTransacted() throws Exception
{
- testP2PFailover(NUM_MESSAGES, true, false);
+ testP2PFailover(numMessages, true,true, false);
}
- private void testP2PFailover(int totalMessages, boolean consumeAll, boolean transacted) throws JMSException, NamingException
+ public void testP2PFailoverTransactedWithMessagesLeftToConsumeAndProduce() throws Exception
{
- Message msg = null;
+ // Currently the cluster does not support transactions that span a failover
+ if (CLUSTERED)
+ {
+ testP2PFailover(numMessages, false,false, false);
+ }
+ }
+
+ private void testP2PFailover(int totalMessages, boolean consumeAll, boolean produceAll , boolean transacted) throws JMSException, NamingException
+ {
init(transacted, Session.AUTO_ACKNOWLEDGE);
- sendMessages(totalMessages, transacted);
+ runP2PFailover(totalMessages,consumeAll, produceAll , transacted);
+ }
+
+ protected void runP2PFailover(int totalMessages, boolean consumeAll, boolean produceAll , boolean transacted) throws JMSException, NamingException
+ {
+ Message msg = null;
+ int toProduce = totalMessages;
+
+ _logger.debug("===================================================================");
+ _logger.debug("Total messages used for the test " + totalMessages + " messages");
+ _logger.debug("===================================================================");
+
+ if (!produceAll)
+ {
+ toProduce = totalMessages - rand.nextInt(totalMessages);
+ }
+
+ _logger.debug("==================");
+ _logger.debug("Sending " + toProduce + " messages");
+ _logger.debug("==================");
+
+ sendMessages(0,toProduce, transacted);
// Consume some messages
- int toConsume = totalMessages;
+ int toConsume = toProduce;
if (!consumeAll)
{
- toConsume = totalMessages / 2;
+ toConsume = toProduce - rand.nextInt(toProduce);
}
+
+ consumeMessages(0,toConsume, transacted);
- consumeMessages(toConsume, transacted);
-
+ _logger.debug("==================");
+ _logger.debug("Consuming " + toConsume + " messages");
+ _logger.debug("==================");
+
_logger.info("Failing over");
causeFailure(DEFAULT_FAILOVER_TIME);
- if (!CLUSTERED)
- {
- msg = consumer.receive(500);
- assertNull("Should not have received message from new broker!", msg);
- }
-
- // Check that messages still sent / received
- sendMessages(totalMessages, transacted);
- consumeMessages(totalMessages, transacted);
+ // Check that you produce and consume the rest of messages.
+ _logger.debug("==================");
+ _logger.debug("Sending " + (totalMessages-toProduce) + " messages");
+ _logger.debug("==================");
+
+ sendMessages(toProduce,totalMessages, transacted);
+ consumeMessages(toConsume,totalMessages, transacted);
+
+ _logger.debug("==================");
+ _logger.debug("Consuming " + (totalMessages-toConsume) + " messages");
+ _logger.debug("==================");
}
private void causeFailure(long delay)
@@ -188,11 +261,11 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener
//evil ignore IE.
}
}
-
+
public void testClientAckFailover() throws Exception
{
init(false, Session.CLIENT_ACKNOWLEDGE);
- sendMessages(1, false);
+ sendMessages(0,1, false);
Message msg = consumer.receive();
assertNotNull("Expected msgs not received", msg);
@@ -208,7 +281,7 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener
failure = e;
}
assertNotNull("Exception should be thrown", failure);
- }
+ }
/**
* The client used to have a fixed timeout of 4 minutes after which failover would no longer work.
@@ -216,6 +289,7 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener
*
* @throws Exception if something unexpected occurs in the test.
*/
+
public void test4MinuteFailover() throws Exception
{
ConnectionURL connectionURL = getConnectionFactory().getConnectionURL();
@@ -228,12 +302,12 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener
details.setProperty(BrokerDetails.OPTIONS_RETRY, String.valueOf(RETRIES));
details.setProperty(BrokerDetails.OPTIONS_CONNECT_DELAY, String.valueOf(DELAY));
- connnection = new AMQConnection(connectionURL, null);
+ connection = new AMQConnection(connectionURL, null);
- ((AMQConnection) connnection).setConnectionListener(this);
+ ((AMQConnection) connection).setConnectionListener(this);
//Start the connection
- connnection.start();
+ connection.start();
long FAILOVER_DELAY = (RETRIES * DELAY);
@@ -247,6 +321,51 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener
assertTrue("Failover did not take long enough", System.nanoTime() > failTime);
}
+
+ /**
+ * The idea is to run a failover test in a loop by failing over
+ * to the other broker each time.
+ */
+ public void testFailoverInALoop() throws Exception
+ {
+ if (!CLUSTERED)
+ {
+ return;
+ }
+
+ int iterations = Integer.getInteger("profile.failoverIterations",0);
+ boolean b = true;
+ int failingPort = getFailingPort();
+ init(false, Session.AUTO_ACKNOWLEDGE);
+ for (int i=0; i < iterations; i++)
+ {
+ _logger.debug("===================================================================");
+ _logger.debug("Failover In a loop : iteration number " + i);
+ _logger.debug("===================================================================");
+
+ runP2PFailover(numMessages, false,false, false);
+ startBroker(failingPort);
+ if (b)
+ {
+ failingPort = getFailingPort()-1;
+ b = false;
+ }
+ else
+ {
+ failingPort = getFailingPort()+1;
+ b = true;
+ }
+ setFailingPort(failingPort);
+ }
+ //To prevent any failover logic being initiaed when we shutdown the brokers.
+ connection.close();
+
+ // Shutdown the brokers
+ stopBroker(getFailingPort());
+ stopBroker(b?getFailingPort()+1 : getFailingPort()-1);
+
+ }
+
public void bytesSent(long count)
{
}