diff options
author | Kim van der Riet <kpvdr@apache.org> | 2012-08-03 12:13:32 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2012-08-03 12:13:32 +0000 |
commit | d43d1912b376322e27fdcda551a73f9ff5487972 (patch) | |
tree | ce493e10baa95f44be8beb5778ce51783463196d /java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java | |
parent | 04877fec0c6346edec67072d7f2d247740cf2af5 (diff) | |
download | qpid-python-d43d1912b376322e27fdcda551a73f9ff5487972.tar.gz |
QPID-3858: Updated branch - merged from trunk r.1368650
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1368910 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java')
-rw-r--r-- | java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java | 185 |
1 files changed, 185 insertions, 0 deletions
diff --git a/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java b/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java index 526db29181..4b766864b4 100644 --- a/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java +++ b/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java @@ -19,7 +19,13 @@ package org.apache.qpid.client.failover; import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQConnectionFactory; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.jms.ConnectionListener; +import org.apache.qpid.jms.ConnectionURL; import org.apache.qpid.jms.FailoverPolicy; import org.apache.qpid.test.utils.FailoverBaseCase; @@ -36,10 +42,14 @@ import javax.jms.QueueBrowser; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.TransactionRolledBackException; +import javax.naming.NamingException; + import java.text.MessageFormat; import java.util.ArrayList; import java.util.Enumeration; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -760,6 +770,181 @@ public class FailoverBehaviourTest extends FailoverBaseCase implements Connectio //got started, before allowing the test to tear down awaitForFailoverCompletion(DEFAULT_FAILOVER_TIME); } + + /** + * This test only tests 0-8/0-9/0-9-1 failover timeout + */ + public void testFailoverHandlerTimeoutExpires() throws Exception + { + _connection.close(); + setTestSystemProperty("qpid.failover_method_timeout", "10000"); + AMQConnection connection = null; + try + { + connection = createConnectionWithFailover(); + + // holding failover mutex should prevent the failover from proceeding + synchronized(connection.getFailoverMutex()) + { + killBroker(); + startBroker(); + + // sleep interval exceeds failover timeout interval + Thread.sleep(11000l); + } + + // allows the failover thread to proceed + Thread.yield(); + assertFalse("Unexpected failover", _failoverComplete.await(2000l, TimeUnit.MILLISECONDS)); + assertTrue("Failover should not succeed due to timeout", connection.isClosed()); + } + finally + { + if (connection != null) + { + connection.close(); + } + } + } + + public void testFailoverHandlerTimeoutReconnected() throws Exception + { + _connection.close(); + setTestSystemProperty("qpid.failover_method_timeout", "10000"); + AMQConnection connection = null; + try + { + connection = createConnectionWithFailover(); + + // holding failover mutex should prevent the failover from proceeding + synchronized(connection.getFailoverMutex()) + { + killBroker(); + startBroker(); + } + + // allows the failover thread to proceed + Thread.yield(); + awaitForFailoverCompletion(DEFAULT_FAILOVER_TIME); + assertFalse("Failover should restore connectivity", connection.isClosed()); + } + finally + { + if (connection != null) + { + connection.close(); + } + } + } + + /** + * Tests that the producer flow control flag is reset when failover occurs while + * the producers are being blocked by the broker. + * + * Uses Java broker specific queue configuration to enabled PSFC. + */ + public void testFlowControlFlagResetOnFailover() throws Exception + { + // we do not need the connection failing to second broker + _connection.close(); + + // make sure that failover timeout is bigger than flow control timeout + setTestSystemProperty("qpid.failover_method_timeout", "60000"); + setTestSystemProperty("qpid.flow_control_wait_failure", "10000"); + + AMQConnection connection = null; + try + { + connection = createConnectionWithFailover(); + + final Session producerSession = connection.createSession(true, Session.SESSION_TRANSACTED); + final Queue queue = createAndBindQueueWithFlowControlEnabled(producerSession, getTestQueueName(), DEFAULT_MESSAGE_SIZE * 3, DEFAULT_MESSAGE_SIZE * 2); + final AtomicInteger counter = new AtomicInteger(); + // try to send 5 messages (should block after 4) + new Thread(new Runnable() + { + @Override + public void run() + { + try + { + MessageProducer producer = producerSession.createProducer(queue); + for (int i=0; i < 5; i++) + { + Message next = createNextMessage(producerSession, i); + producer.send(next); + producerSession.commit(); + counter.incrementAndGet(); + } + } + catch(Exception e) + { + // ignore + } + } + }).start(); + + long limit= 30000l; + long start = System.currentTimeMillis(); + + // wait until session is blocked + while(!((AMQSession<?,?>)producerSession).isFlowBlocked() && System.currentTimeMillis() - start < limit) + { + Thread.sleep(100l); + } + + assertTrue("Flow is not blocked", ((AMQSession<?, ?>) producerSession).isFlowBlocked()); + // Message counter could be 3 or 4 depending on the progression of producing thread relative + // to the receipt of the ChannelFlow. + final int currentCounter = counter.get(); + assertTrue("Unexpected number of sent messages", currentCounter == 3 || currentCounter == 4); + + killBroker(); + startBroker(); + + // allows the failover thread to proceed + Thread.yield(); + awaitForFailoverCompletion(60000l); + + assertFalse("Flow is blocked", ((AMQSession<?, ?>) producerSession).isFlowBlocked()); + } + finally + { + if (connection != null) + { + connection.close(); + } + } + } + + private Queue createAndBindQueueWithFlowControlEnabled(Session session, String queueName, int capacity, int resumeCapacity) throws Exception + { + final Map<String, Object> arguments = new HashMap<String, Object>(); + arguments.put("x-qpid-capacity", capacity); + arguments.put("x-qpid-flow-resume-capacity", resumeCapacity); + ((AMQSession<?, ?>) session).createQueue(new AMQShortString(queueName), true, true, false, arguments); + Queue queue = session.createQueue("direct://amq.direct/" + queueName + "/" + queueName + "?durable='" + true + + "'&autodelete='" + true + "'"); + ((AMQSession<?, ?>) session).declareAndBind((AMQDestination) queue); + return queue; + } + + private AMQConnection createConnectionWithFailover() throws NamingException, JMSException + { + AMQConnection connection; + AMQConnectionFactory connectionFactory = (AMQConnectionFactory)getConnectionFactory("default"); + ConnectionURL connectionURL = connectionFactory.getConnectionURL(); + connectionURL.setOption(ConnectionURL.OPTIONS_FAILOVER, "singlebroker"); + connectionURL.setOption(ConnectionURL.OPTIONS_FAILOVER_CYCLE, "2"); + BrokerDetails details = connectionURL.getBrokerDetails(0); + details.setProperty(BrokerDetails.OPTIONS_RETRY, "200"); + details.setProperty(BrokerDetails.OPTIONS_CONNECT_DELAY, "1000"); + + connection = (AMQConnection)connectionFactory.createConnection("admin", "admin"); + connection.setConnectionListener(this); + return connection; + } + /** * Tests {@link Session#close()} for session with given acknowledge mode * to ensure that close works after failover. |