summaryrefslogtreecommitdiff
path: root/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java')
-rw-r--r--trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java306
1 files changed, 0 insertions, 306 deletions
diff --git a/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java b/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java
deleted file mode 100644
index ae7e30c231..0000000000
--- a/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java
+++ /dev/null
@@ -1,306 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.test.unit.ack;
-
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.jms.ConnectionListener;
-
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TransactionRolledBackException;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-/**
- *
- */
-public class AcknowledgeAfterFailoverTest extends AcknowledgeTest implements ConnectionListener
-{
-
- protected CountDownLatch _failoverCompleted = new CountDownLatch(1);
-
- @Override
- public void setUp() throws Exception
- {
- super.setUp();
- // This must be even for the test to run correctly.
- // Otherwise we will kill the standby broker
- // not the one we are connected to.
- // The test will still pass but it will not be exactly
- // as described.
- NUM_MESSAGES = 6;
- }
-
- /**
- * Override default init to add connectionListener so we can verify that
- * failover took place
- *
- * @param transacted create a transacted session for this test
- * @param mode if not transacted what ack mode to use for this test
- * @throws Exception if a problem occured during test setup.
- */
- @Override
- protected void init(boolean transacted, int mode) throws Exception
- {
- super.init(transacted, mode);
- ((AMQConnection) _connection).setConnectionListener(this);
- }
-
- protected void prepBroker(int count) throws Exception
- {
- if (count % 2 == 1)
- {
- failBroker(getFailingPort());
- }
- else
- {
- failBroker(getPort());
- }
-
- Connection connection = getConnection();
- Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
- // ensure destination is created.
- session.createConsumer(_queue).close();
-
- sendMessage(session, _queue, count, NUM_MESSAGES - count, 0);
-
- if (_consumerSession.getAcknowledgeMode() != AMQSession.NO_ACKNOWLEDGE)
- {
- assertEquals("Wrong number of messages on queue", count,
- ((AMQSession) session).getQueueDepth((AMQDestination) _queue));
- }
-
- connection.close();
-
- try
- {
- if (count % 2 == 1)
- {
- startBroker(getFailingPort());
- }
- else
- {
- startBroker(getPort());
- }
- }
- catch (Exception e)
- {
- fail("Unable to start failover broker," + e.getMessage());
- }
- }
-
- @Override
- public void doAcknowlegement(Message msg) throws JMSException
- {
- //Acknowledge current message
- super.doAcknowlegement(msg);
-
- try
- {
- prepBroker(NUM_MESSAGES - msg.getIntProperty(INDEX) - 1);
- }
- catch (Exception e)
- {
- fail("Unable to prep new broker," + e.getMessage());
- }
-
- }
-
- /**
- * Test that Acking/Committing a message received before failover causes
- * an exception at commit/ack time.
- *
- * Expected behaviour is that in:
- * * tx mode commit() throws a transacted RolledBackException
- * * client ack mode throws an IllegalStateException
- *
- * @param transacted is this session trasacted
- * @param mode What ack mode should be used if not trasacted
- *
- * @throws Exception if something goes wrong.
- */
- protected void testDirtyAcking(boolean transacted, int mode) throws Exception
- {
- NUM_MESSAGES = 2;
- //Test Dirty Failover Fails
- init(transacted, mode);
-
- _connection.start();
-
- Message msg = _consumer.receive(1500);
-
- int count = 0;
- assertNotNull("Message " + count + " not correctly received.", msg);
- assertEquals("Incorrect message received", count, msg.getIntProperty(INDEX));
-
- //Don't acknowledge just prep the next broker. Without changing count
- // Prep the new broker to have all all the messages so we can validate
- // that they can all be correctly received.
- try
- {
-
- //Stop the connection so we can validate the number of message count
- // on the queue is correct after failover
- _connection.stop();
- failBroker(getFailingPort());
-
- //Get the connection to the first (main port) broker.
- Connection connection = getConnection();//getConnectionFactory("connection1").getConnectionURL());
- // Use a transaction to send messages so we can be sure they arrive.
- Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
- // ensure destination is created.
- session.createConsumer(_queue).close();
-
- sendMessage(session, _queue, NUM_MESSAGES);
-
- assertEquals("Wrong number of messages on queue", NUM_MESSAGES,
- ((AMQSession) session).getQueueDepth((AMQDestination) _queue));
-
- connection.close();
-
- //restart connection
- _connection.start();
- }
- catch (Exception e)
- {
- fail("Unable to prep new broker," + e.getMessage());
- }
-
- // Consume the next message - don't check what it is as a normal would
- // assume it is msg 1 but as we've fallen over it is msg 0 again.
- msg = _consumer.receive(1500);
-
- if (_consumerSession.getTransacted())
- {
- try
- {
- _consumerSession.commit();
- fail("Session is dirty we should get an TransactionRolledBackException");
- }
- catch (TransactionRolledBackException trbe)
- {
- //expected path
- }
- }
- else
- {
- try
- {
- msg.acknowledge();
- fail("Session is dirty we should get an IllegalStateException");
- }
- catch (javax.jms.IllegalStateException ise)
- {
- assertEquals("Incorrect Exception thrown", "has failed over", ise.getMessage());
- // Recover the sesion and try again.
- _consumerSession.recover();
- }
- }
-
- msg = _consumer.receive(1500);
- // Validate we now get the first message back
- assertEquals(0, msg.getIntProperty(INDEX));
-
- msg = _consumer.receive(1500);
- // and the second message
- assertEquals(1, msg.getIntProperty(INDEX));
-
- // And now verify that we can now commit the clean session
- if (_consumerSession.getTransacted())
- {
- _consumerSession.commit();
- }
- else
- {
- msg.acknowledge();
- }
-
- assertEquals("Wrong number of messages on queue", 0,
- ((AMQSession) _consumerSession).getQueueDepth((AMQDestination) _queue));
- }
-
- public void testDirtyClientAck() throws Exception
- {
- testDirtyAcking(false, Session.CLIENT_ACKNOWLEDGE);
- }
-
- public void testDirtyAckingTransacted() throws Exception
- {
- testDirtyAcking(true, Session.SESSION_TRANSACTED);
- }
-
- // AMQConnectionListener Interface.. used so we can validate that we
- // actually failed over.
-
- public void bytesSent(long count)
- {
- }
-
- public void bytesReceived(long count)
- {
- }
-
- public boolean preFailover(boolean redirect)
- {
- //Allow failover
- return true;
- }
-
- public boolean preResubscribe()
- {
- //Allow failover
- return true;
- }
-
- public void failoverComplete()
- {
- _failoverCompleted.countDown();
- }
-
- /**
- * Override so we can block until failover has completd
- *
- * @param port
- */
- @Override
- public void failBroker(int port)
- {
- super.failBroker(port);
-
- try
- {
- if (!_failoverCompleted.await(DEFAULT_FAILOVER_TIME, TimeUnit.MILLISECONDS))
- {
- fail("Failover did not occur in specified time:" + DEFAULT_FAILOVER_TIME);
- }
- }
- catch (InterruptedException e)
- {
- fail("Failover was interrupted");
- }
- }
-
-}