diff options
Diffstat (limited to 'java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java')
-rw-r--r-- | java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java | 174 |
1 files changed, 91 insertions, 83 deletions
diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java index e7d7c7eba6..bf87e8e84f 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java @@ -21,14 +21,15 @@ package org.apache.qpid.test.client.failover; -import junit.framework.TestCase; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQConnectionFactory; -import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQConnectionURL; import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.jms.ConnectionListener; +import org.apache.qpid.jms.ConnectionURL; +import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.url.URLSyntaxException; +import org.apache.qpid.test.utils.FailoverBaseCase; import org.apache.log4j.Logger; import javax.jms.Connection; @@ -38,82 +39,66 @@ import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; +import javax.jms.Queue; +import javax.naming.NamingException; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; -public class FailoverTest extends TestCase implements ConnectionListener +public class FailoverTest extends FailoverBaseCase implements ConnectionListener { private static final Logger _logger = Logger.getLogger(FailoverTest.class); - private static final int NUM_BROKERS = 2; - private static final String BROKER = "amqp://guest:guest@/test?brokerlist='vm://:%d;vm://:%d'"; private static final String QUEUE = "queue"; private static final int NUM_MESSAGES = 10; - private Connection con; - private AMQConnectionFactory conFactory; - private Session prodSess; - private AMQQueue q; - private MessageProducer prod; - private Session conSess; + private Connection connnection; + private Session producerSession; + private Queue queue; + private MessageProducer producer; + private Session consumerSession; private MessageConsumer consumer; private static int usedBrokers = 0; private CountDownLatch failoverComplete; + private static final long DEFAULT_FAILOVER_TIME = 10000L; @Override protected void setUp() throws Exception { super.setUp(); - // Create two VM brokers - for (int i = 0; i < NUM_BROKERS; i++) - { - usedBrokers++; - - TransportConnection.createVMBroker(usedBrokers); - } - - conFactory = new AMQConnectionFactory(String.format(BROKER, usedBrokers - 1, usedBrokers)); - _logger.info("Connecting on:" + conFactory.getConnectionURL()); - con = conFactory.createConnection(); - ((AMQConnection) con).setConnectionListener(this); - con.start(); + connnection = getConnection(); + ((AMQConnection) connnection).setConnectionListener(this); + connnection.start(); failoverComplete = new CountDownLatch(1); } - private void init(boolean transacted, int mode) throws JMSException + private void init(boolean transacted, int mode) throws JMSException, NamingException { - prodSess = con.createSession(transacted, mode); - q = new AMQQueue("amq.direct", QUEUE); - prod = prodSess.createProducer(q); - conSess = con.createSession(transacted, mode); - consumer = conSess.createConsumer(q); + queue = (Queue) getInitialContext().lookup(QUEUE); + + consumerSession = connnection.createSession(transacted, mode); + consumer = consumerSession.createConsumer(queue); + + producerSession = connnection.createSession(transacted, mode); + producer = producerSession.createProducer(queue); } @Override - protected void tearDown() throws Exception + public void tearDown() throws Exception { try { - con.close(); + connnection.close(); } catch (Exception e) { } - try - { - TransportConnection.killAllVMBrokers(); - ApplicationRegistry.removeAll(); - } - catch (Exception e) - { - fail("Unable to clean up"); - } super.tearDown(); } - private void consumeMessages(int toConsume) throws JMSException + private void consumeMessages(int toConsume, boolean transacted) throws JMSException { Message msg; for (int i = 0; i < toConsume; i++) @@ -122,40 +107,43 @@ public class FailoverTest extends TestCase implements ConnectionListener assertNotNull("Message " + i + " was null!", msg); assertEquals("message " + i, ((TextMessage) msg).getText()); } + if (transacted) { + consumerSession.commit(); + } } - private void sendMessages(int totalMessages) throws JMSException + private void sendMessages(int totalMessages, boolean transacted) throws JMSException { for (int i = 0; i < totalMessages; i++) { - prod.send(prodSess.createTextMessage("message " + i)); + producer.send(producerSession.createTextMessage("message " + i)); + } + if (transacted) + { + producerSession.commit(); } - -// try -// { -// Thread.sleep(100 * totalMessages); -// } -// catch (InterruptedException e) -// { -// //evil ignoring of IE -// } } public void testP2PFailover() throws Exception { - testP2PFailover(NUM_MESSAGES, true); + testP2PFailover(NUM_MESSAGES, true, false); } public void testP2PFailoverWithMessagesLeft() throws Exception { - testP2PFailover(NUM_MESSAGES, false); + testP2PFailover(NUM_MESSAGES, false, false); + } + + public void testP2PFailoverTransacted() throws Exception + { + testP2PFailover(NUM_MESSAGES, true, false); } - private void testP2PFailover(int totalMessages, boolean consumeAll) throws JMSException + private void testP2PFailover(int totalMessages, boolean consumeAll, boolean transacted) throws JMSException, NamingException { Message msg = null; - init(false, Session.AUTO_ACKNOWLEDGE); - sendMessages(totalMessages); + init(transacted, Session.AUTO_ACKNOWLEDGE); + sendMessages(totalMessages, transacted); // Consume some messages int toConsume = totalMessages; @@ -164,31 +152,29 @@ public class FailoverTest extends TestCase implements ConnectionListener toConsume = totalMessages / 2; } - consumeMessages(toConsume); + consumeMessages(toConsume, transacted); _logger.info("Failing over"); - causeFailure(); + causeFailure(DEFAULT_FAILOVER_TIME); msg = consumer.receive(500); - //todo: reinstate + assertNull("Should not have received message from new broker!", msg); // Check that messages still sent / received - sendMessages(totalMessages); - consumeMessages(totalMessages); + sendMessages(totalMessages, transacted); + consumeMessages(totalMessages, transacted); } - private void causeFailure() + private void causeFailure(long delay) { - _logger.info("Failover"); - TransportConnection.killVMBroker(usedBrokers - 1); - ApplicationRegistry.remove(usedBrokers - 1); + failBroker(); _logger.info("Awaiting Failover completion"); try { - failoverComplete.await(); + failoverComplete.await(delay, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { @@ -199,12 +185,11 @@ public class FailoverTest extends TestCase implements ConnectionListener public void testClientAckFailover() throws Exception { init(false, Session.CLIENT_ACKNOWLEDGE); - sendMessages(1); + sendMessages(1, false); Message msg = consumer.receive(); assertNotNull("Expected msgs not received", msg); - - causeFailure(); + causeFailure(DEFAULT_FAILOVER_TIME); Exception failure = null; try @@ -218,18 +203,41 @@ public class FailoverTest extends TestCase implements ConnectionListener assertNotNull("Exception should be thrown", failure); } - // This test disabled so that it doesn't add 4 minnutes to the length of time it takes to run, which would be lame - public void txest4MinuteFailover() throws Exception + /** + * The client used to have a fixed timeout of 4 minutes after which failover would no longer work. + * Check that this code has not regressed + * + * @throws Exception if something unexpected occurs in the test. + */ + public void test4MinuteFailover() throws Exception { - conFactory = new AMQConnectionFactory("amqp://guest:guest@/test?brokerlist='vm://:"+(usedBrokers-1)+"?connectdelay='60000'&retries='2''"); - _logger.info("Connecting on:" + conFactory.getConnectionURL()); - con = conFactory.createConnection(); - ((AMQConnection) con).setConnectionListener(this); - con.start(); - - long failTime = System.currentTimeMillis() + 60000; - causeFailure(); - assertTrue("Failover did not take long enough", System.currentTimeMillis() > failTime); + ConnectionURL connectionURL = getConnectionFactory().getConnectionURL(); + + int RETRIES = 4; + int DELAY = 60000; + + //Set up a long delay on and large number of retries + BrokerDetails details = connectionURL.getBrokerDetails(1); + details.setProperty(BrokerDetails.OPTIONS_RETRY, String.valueOf(RETRIES)); + details.setProperty(BrokerDetails.OPTIONS_CONNECT_DELAY, String.valueOf(DELAY)); + + connnection = new AMQConnection(connectionURL, null); + + ((AMQConnection) connnection).setConnectionListener(this); + + //Start the connection + connnection.start(); + + long FAILOVER_DELAY = (RETRIES * DELAY); + + // Use Nano seconds as it is more accurate for comparision. + long failTime = System.nanoTime() + FAILOVER_DELAY * 1000000; + + //Fail the first broker + causeFailure(FAILOVER_DELAY + DEFAULT_FAILOVER_TIME); + + //Reconnection should occur + assertTrue("Failover did not take long enough", System.nanoTime() > failTime); } public void bytesSent(long count) |