summaryrefslogtreecommitdiff
path: root/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java')
-rw-r--r--java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java185
1 files changed, 185 insertions, 0 deletions
diff --git a/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java b/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java
index 526db29181..4b766864b4 100644
--- a/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java
@@ -19,7 +19,13 @@
package org.apache.qpid.client.failover;
import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.jms.FailoverPolicy;
import org.apache.qpid.test.utils.FailoverBaseCase;
@@ -36,10 +42,14 @@ import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.TransactionRolledBackException;
+import javax.naming.NamingException;
+
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Enumeration;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -760,6 +770,181 @@ public class FailoverBehaviourTest extends FailoverBaseCase implements Connectio
//got started, before allowing the test to tear down
awaitForFailoverCompletion(DEFAULT_FAILOVER_TIME);
}
+
+ /**
+ * This test only tests 0-8/0-9/0-9-1 failover timeout
+ */
+ public void testFailoverHandlerTimeoutExpires() throws Exception
+ {
+ _connection.close();
+ setTestSystemProperty("qpid.failover_method_timeout", "10000");
+ AMQConnection connection = null;
+ try
+ {
+ connection = createConnectionWithFailover();
+
+ // holding failover mutex should prevent the failover from proceeding
+ synchronized(connection.getFailoverMutex())
+ {
+ killBroker();
+ startBroker();
+
+ // sleep interval exceeds failover timeout interval
+ Thread.sleep(11000l);
+ }
+
+ // allows the failover thread to proceed
+ Thread.yield();
+ assertFalse("Unexpected failover", _failoverComplete.await(2000l, TimeUnit.MILLISECONDS));
+ assertTrue("Failover should not succeed due to timeout", connection.isClosed());
+ }
+ finally
+ {
+ if (connection != null)
+ {
+ connection.close();
+ }
+ }
+ }
+
+ public void testFailoverHandlerTimeoutReconnected() throws Exception
+ {
+ _connection.close();
+ setTestSystemProperty("qpid.failover_method_timeout", "10000");
+ AMQConnection connection = null;
+ try
+ {
+ connection = createConnectionWithFailover();
+
+ // holding failover mutex should prevent the failover from proceeding
+ synchronized(connection.getFailoverMutex())
+ {
+ killBroker();
+ startBroker();
+ }
+
+ // allows the failover thread to proceed
+ Thread.yield();
+ awaitForFailoverCompletion(DEFAULT_FAILOVER_TIME);
+ assertFalse("Failover should restore connectivity", connection.isClosed());
+ }
+ finally
+ {
+ if (connection != null)
+ {
+ connection.close();
+ }
+ }
+ }
+
+ /**
+ * Tests that the producer flow control flag is reset when failover occurs while
+ * the producers are being blocked by the broker.
+ *
+ * Uses Java broker specific queue configuration to enabled PSFC.
+ */
+ public void testFlowControlFlagResetOnFailover() throws Exception
+ {
+ // we do not need the connection failing to second broker
+ _connection.close();
+
+ // make sure that failover timeout is bigger than flow control timeout
+ setTestSystemProperty("qpid.failover_method_timeout", "60000");
+ setTestSystemProperty("qpid.flow_control_wait_failure", "10000");
+
+ AMQConnection connection = null;
+ try
+ {
+ connection = createConnectionWithFailover();
+
+ final Session producerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
+ final Queue queue = createAndBindQueueWithFlowControlEnabled(producerSession, getTestQueueName(), DEFAULT_MESSAGE_SIZE * 3, DEFAULT_MESSAGE_SIZE * 2);
+ final AtomicInteger counter = new AtomicInteger();
+ // try to send 5 messages (should block after 4)
+ new Thread(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ MessageProducer producer = producerSession.createProducer(queue);
+ for (int i=0; i < 5; i++)
+ {
+ Message next = createNextMessage(producerSession, i);
+ producer.send(next);
+ producerSession.commit();
+ counter.incrementAndGet();
+ }
+ }
+ catch(Exception e)
+ {
+ // ignore
+ }
+ }
+ }).start();
+
+ long limit= 30000l;
+ long start = System.currentTimeMillis();
+
+ // wait until session is blocked
+ while(!((AMQSession<?,?>)producerSession).isFlowBlocked() && System.currentTimeMillis() - start < limit)
+ {
+ Thread.sleep(100l);
+ }
+
+ assertTrue("Flow is not blocked", ((AMQSession<?, ?>) producerSession).isFlowBlocked());
+ // Message counter could be 3 or 4 depending on the progression of producing thread relative
+ // to the receipt of the ChannelFlow.
+ final int currentCounter = counter.get();
+ assertTrue("Unexpected number of sent messages", currentCounter == 3 || currentCounter == 4);
+
+ killBroker();
+ startBroker();
+
+ // allows the failover thread to proceed
+ Thread.yield();
+ awaitForFailoverCompletion(60000l);
+
+ assertFalse("Flow is blocked", ((AMQSession<?, ?>) producerSession).isFlowBlocked());
+ }
+ finally
+ {
+ if (connection != null)
+ {
+ connection.close();
+ }
+ }
+ }
+
+ private Queue createAndBindQueueWithFlowControlEnabled(Session session, String queueName, int capacity, int resumeCapacity) throws Exception
+ {
+ final Map<String, Object> arguments = new HashMap<String, Object>();
+ arguments.put("x-qpid-capacity", capacity);
+ arguments.put("x-qpid-flow-resume-capacity", resumeCapacity);
+ ((AMQSession<?, ?>) session).createQueue(new AMQShortString(queueName), true, true, false, arguments);
+ Queue queue = session.createQueue("direct://amq.direct/" + queueName + "/" + queueName + "?durable='" + true
+ + "'&autodelete='" + true + "'");
+ ((AMQSession<?, ?>) session).declareAndBind((AMQDestination) queue);
+ return queue;
+ }
+
+ private AMQConnection createConnectionWithFailover() throws NamingException, JMSException
+ {
+ AMQConnection connection;
+ AMQConnectionFactory connectionFactory = (AMQConnectionFactory)getConnectionFactory("default");
+ ConnectionURL connectionURL = connectionFactory.getConnectionURL();
+ connectionURL.setOption(ConnectionURL.OPTIONS_FAILOVER, "singlebroker");
+ connectionURL.setOption(ConnectionURL.OPTIONS_FAILOVER_CYCLE, "2");
+ BrokerDetails details = connectionURL.getBrokerDetails(0);
+ details.setProperty(BrokerDetails.OPTIONS_RETRY, "200");
+ details.setProperty(BrokerDetails.OPTIONS_CONNECT_DELAY, "1000");
+
+ connection = (AMQConnection)connectionFactory.createConnection("admin", "admin");
+ connection.setConnectionListener(this);
+ return connection;
+ }
+
/**
* Tests {@link Session#close()} for session with given acknowledge mode
* to ensure that close works after failover.