summaryrefslogtreecommitdiff
path: root/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java')
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java174
1 files changed, 91 insertions, 83 deletions
diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java
index e7d7c7eba6..bf87e8e84f 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java
@@ -21,14 +21,15 @@
package org.apache.qpid.test.client.failover;
-import junit.framework.TestCase;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQConnectionFactory;
-import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQConnectionURL;
import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.url.URLSyntaxException;
+import org.apache.qpid.test.utils.FailoverBaseCase;
import org.apache.log4j.Logger;
import javax.jms.Connection;
@@ -38,82 +39,66 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
+import javax.jms.Queue;
+import javax.naming.NamingException;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
-public class FailoverTest extends TestCase implements ConnectionListener
+public class FailoverTest extends FailoverBaseCase implements ConnectionListener
{
private static final Logger _logger = Logger.getLogger(FailoverTest.class);
- private static final int NUM_BROKERS = 2;
- private static final String BROKER = "amqp://guest:guest@/test?brokerlist='vm://:%d;vm://:%d'";
private static final String QUEUE = "queue";
private static final int NUM_MESSAGES = 10;
- private Connection con;
- private AMQConnectionFactory conFactory;
- private Session prodSess;
- private AMQQueue q;
- private MessageProducer prod;
- private Session conSess;
+ private Connection connnection;
+ private Session producerSession;
+ private Queue queue;
+ private MessageProducer producer;
+ private Session consumerSession;
private MessageConsumer consumer;
private static int usedBrokers = 0;
private CountDownLatch failoverComplete;
+ private static final long DEFAULT_FAILOVER_TIME = 10000L;
@Override
protected void setUp() throws Exception
{
super.setUp();
- // Create two VM brokers
- for (int i = 0; i < NUM_BROKERS; i++)
- {
- usedBrokers++;
-
- TransportConnection.createVMBroker(usedBrokers);
- }
-
- conFactory = new AMQConnectionFactory(String.format(BROKER, usedBrokers - 1, usedBrokers));
- _logger.info("Connecting on:" + conFactory.getConnectionURL());
- con = conFactory.createConnection();
- ((AMQConnection) con).setConnectionListener(this);
- con.start();
+ connnection = getConnection();
+ ((AMQConnection) connnection).setConnectionListener(this);
+ connnection.start();
failoverComplete = new CountDownLatch(1);
}
- private void init(boolean transacted, int mode) throws JMSException
+ private void init(boolean transacted, int mode) throws JMSException, NamingException
{
- prodSess = con.createSession(transacted, mode);
- q = new AMQQueue("amq.direct", QUEUE);
- prod = prodSess.createProducer(q);
- conSess = con.createSession(transacted, mode);
- consumer = conSess.createConsumer(q);
+ queue = (Queue) getInitialContext().lookup(QUEUE);
+
+ consumerSession = connnection.createSession(transacted, mode);
+ consumer = consumerSession.createConsumer(queue);
+
+ producerSession = connnection.createSession(transacted, mode);
+ producer = producerSession.createProducer(queue);
}
@Override
- protected void tearDown() throws Exception
+ public void tearDown() throws Exception
{
try
{
- con.close();
+ connnection.close();
}
catch (Exception e)
{
}
- try
- {
- TransportConnection.killAllVMBrokers();
- ApplicationRegistry.removeAll();
- }
- catch (Exception e)
- {
- fail("Unable to clean up");
- }
super.tearDown();
}
- private void consumeMessages(int toConsume) throws JMSException
+ private void consumeMessages(int toConsume, boolean transacted) throws JMSException
{
Message msg;
for (int i = 0; i < toConsume; i++)
@@ -122,40 +107,43 @@ public class FailoverTest extends TestCase implements ConnectionListener
assertNotNull("Message " + i + " was null!", msg);
assertEquals("message " + i, ((TextMessage) msg).getText());
}
+ if (transacted) {
+ consumerSession.commit();
+ }
}
- private void sendMessages(int totalMessages) throws JMSException
+ private void sendMessages(int totalMessages, boolean transacted) throws JMSException
{
for (int i = 0; i < totalMessages; i++)
{
- prod.send(prodSess.createTextMessage("message " + i));
+ producer.send(producerSession.createTextMessage("message " + i));
+ }
+ if (transacted)
+ {
+ producerSession.commit();
}
-
-// try
-// {
-// Thread.sleep(100 * totalMessages);
-// }
-// catch (InterruptedException e)
-// {
-// //evil ignoring of IE
-// }
}
public void testP2PFailover() throws Exception
{
- testP2PFailover(NUM_MESSAGES, true);
+ testP2PFailover(NUM_MESSAGES, true, false);
}
public void testP2PFailoverWithMessagesLeft() throws Exception
{
- testP2PFailover(NUM_MESSAGES, false);
+ testP2PFailover(NUM_MESSAGES, false, false);
+ }
+
+ public void testP2PFailoverTransacted() throws Exception
+ {
+ testP2PFailover(NUM_MESSAGES, true, false);
}
- private void testP2PFailover(int totalMessages, boolean consumeAll) throws JMSException
+ private void testP2PFailover(int totalMessages, boolean consumeAll, boolean transacted) throws JMSException, NamingException
{
Message msg = null;
- init(false, Session.AUTO_ACKNOWLEDGE);
- sendMessages(totalMessages);
+ init(transacted, Session.AUTO_ACKNOWLEDGE);
+ sendMessages(totalMessages, transacted);
// Consume some messages
int toConsume = totalMessages;
@@ -164,31 +152,29 @@ public class FailoverTest extends TestCase implements ConnectionListener
toConsume = totalMessages / 2;
}
- consumeMessages(toConsume);
+ consumeMessages(toConsume, transacted);
_logger.info("Failing over");
- causeFailure();
+ causeFailure(DEFAULT_FAILOVER_TIME);
msg = consumer.receive(500);
- //todo: reinstate
+
assertNull("Should not have received message from new broker!", msg);
// Check that messages still sent / received
- sendMessages(totalMessages);
- consumeMessages(totalMessages);
+ sendMessages(totalMessages, transacted);
+ consumeMessages(totalMessages, transacted);
}
- private void causeFailure()
+ private void causeFailure(long delay)
{
- _logger.info("Failover");
- TransportConnection.killVMBroker(usedBrokers - 1);
- ApplicationRegistry.remove(usedBrokers - 1);
+ failBroker();
_logger.info("Awaiting Failover completion");
try
{
- failoverComplete.await();
+ failoverComplete.await(delay, TimeUnit.MILLISECONDS);
}
catch (InterruptedException e)
{
@@ -199,12 +185,11 @@ public class FailoverTest extends TestCase implements ConnectionListener
public void testClientAckFailover() throws Exception
{
init(false, Session.CLIENT_ACKNOWLEDGE);
- sendMessages(1);
+ sendMessages(1, false);
Message msg = consumer.receive();
assertNotNull("Expected msgs not received", msg);
-
- causeFailure();
+ causeFailure(DEFAULT_FAILOVER_TIME);
Exception failure = null;
try
@@ -218,18 +203,41 @@ public class FailoverTest extends TestCase implements ConnectionListener
assertNotNull("Exception should be thrown", failure);
}
- // This test disabled so that it doesn't add 4 minnutes to the length of time it takes to run, which would be lame
- public void txest4MinuteFailover() throws Exception
+ /**
+ * The client used to have a fixed timeout of 4 minutes after which failover would no longer work.
+ * Check that this code has not regressed
+ *
+ * @throws Exception if something unexpected occurs in the test.
+ */
+ public void test4MinuteFailover() throws Exception
{
- conFactory = new AMQConnectionFactory("amqp://guest:guest@/test?brokerlist='vm://:"+(usedBrokers-1)+"?connectdelay='60000'&retries='2''");
- _logger.info("Connecting on:" + conFactory.getConnectionURL());
- con = conFactory.createConnection();
- ((AMQConnection) con).setConnectionListener(this);
- con.start();
-
- long failTime = System.currentTimeMillis() + 60000;
- causeFailure();
- assertTrue("Failover did not take long enough", System.currentTimeMillis() > failTime);
+ ConnectionURL connectionURL = getConnectionFactory().getConnectionURL();
+
+ int RETRIES = 4;
+ int DELAY = 60000;
+
+ //Set up a long delay on and large number of retries
+ BrokerDetails details = connectionURL.getBrokerDetails(1);
+ details.setProperty(BrokerDetails.OPTIONS_RETRY, String.valueOf(RETRIES));
+ details.setProperty(BrokerDetails.OPTIONS_CONNECT_DELAY, String.valueOf(DELAY));
+
+ connnection = new AMQConnection(connectionURL, null);
+
+ ((AMQConnection) connnection).setConnectionListener(this);
+
+ //Start the connection
+ connnection.start();
+
+ long FAILOVER_DELAY = (RETRIES * DELAY);
+
+ // Use Nano seconds as it is more accurate for comparision.
+ long failTime = System.nanoTime() + FAILOVER_DELAY * 1000000;
+
+ //Fail the first broker
+ causeFailure(FAILOVER_DELAY + DEFAULT_FAILOVER_TIME);
+
+ //Reconnection should occur
+ assertTrue("Failover did not take long enough", System.nanoTime() > failTime);
}
public void bytesSent(long count)