diff options
3 files changed, 49 insertions, 31 deletions
diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java index ae7e30c231..acc7d5a4c1 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java @@ -1,36 +1,37 @@ /* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ package org.apache.qpid.test.unit.ack; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQSession; import org.apache.qpid.jms.ConnectionListener; +import org.apache.qpid.util.FileUtils; import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; -import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TransactionRolledBackException; +import java.io.File; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -59,9 +60,9 @@ public class AcknowledgeAfterFailoverTest extends AcknowledgeTest implements Con * failover took place * * @param transacted create a transacted session for this test - * @param mode if not transacted what ack mode to use for this test + * @param mode if not transacted what ack mode to use for this test * @throws Exception if a problem occured during test setup. - */ + */ @Override protected void init(boolean transacted, int mode) throws Exception { @@ -69,27 +70,38 @@ public class AcknowledgeAfterFailoverTest extends AcknowledgeTest implements Con ((AMQConnection) _connection).setConnectionListener(this); } - protected void prepBroker(int count) throws Exception + protected void prepBroker(int index) throws Exception { - if (count % 2 == 1) + // If this is the last message then we can skip the prep. + if (index == NUM_MESSAGES) + { + return; + } + + if (index % 2 == 0) { failBroker(getFailingPort()); + // Clean up the failed broker + FileUtils.delete(new File(System.getProperty("QPID_WORK") + "/" + getFailingPort()), true); } else { failBroker(getPort()); + // Clean up the failed broker + FileUtils.delete(new File(System.getProperty("QPID_WORK") + "/" + getPort()), true); } + // Ensure we have the right data on the broker Connection connection = getConnection(); Session session = connection.createSession(true, Session.SESSION_TRANSACTED); // ensure destination is created. session.createConsumer(_queue).close(); - sendMessage(session, _queue, count, NUM_MESSAGES - count, 0); + sendMessage(session, _queue, 1, index + 1, 0); if (_consumerSession.getAcknowledgeMode() != AMQSession.NO_ACKNOWLEDGE) { - assertEquals("Wrong number of messages on queue", count, + assertEquals("Wrong number of messages on queue", 1, ((AMQSession) session).getQueueDepth((AMQDestination) _queue)); } @@ -97,7 +109,7 @@ public class AcknowledgeAfterFailoverTest extends AcknowledgeTest implements Con try { - if (count % 2 == 1) + if (index % 2 == 0) { startBroker(getFailingPort()); } @@ -120,7 +132,7 @@ public class AcknowledgeAfterFailoverTest extends AcknowledgeTest implements Con try { - prepBroker(NUM_MESSAGES - msg.getIntProperty(INDEX) - 1); + prepBroker(msg.getIntProperty(INDEX)); } catch (Exception e) { @@ -132,14 +144,13 @@ public class AcknowledgeAfterFailoverTest extends AcknowledgeTest implements Con /** * Test that Acking/Committing a message received before failover causes * an exception at commit/ack time. - * + * <p/> * Expected behaviour is that in: * * tx mode commit() throws a transacted RolledBackException * * client ack mode throws an IllegalStateException * * @param transacted is this session trasacted * @param mode What ack mode should be used if not trasacted - * * @throws Exception if something goes wrong. */ protected void testDirtyAcking(boolean transacted, int mode) throws Exception diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java index 7c9a77eb53..36731107c5 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java @@ -99,6 +99,12 @@ public class AcknowledgeTest extends FailoverBaseCase msg = _consumer.receive(1500); } + if (_consumerSession.getTransacted()) + { + //Acknowledge the last msg if we are testing transacted otherwise queueDepth will be 1 + doAcknowlegement(msg); + } + assertEquals("Wrong number of messages on queue", 0, ((AMQSession) _consumerSession).getQueueDepth((AMQDestination) _queue)); } diff --git a/java/test-profiles/java-derby.testprofile b/java/test-profiles/java-derby.testprofile index 689b2b4357..1238a44c84 100644 --- a/java/test-profiles/java-derby.testprofile +++ b/java/test-profiles/java-derby.testprofile @@ -1,8 +1,9 @@ broker.language=java broker=${project.root}/build/bin/qpid-server -p @PORT -m @MPORT --exclude-0-10 @PORT -c @CONFIG_FILE -l ${test.profiles}/log4j-test.xml -broker.clean=${test.profiles}/clean-dir ${build.data} ${project.root}/build/work/derbyDB +broker.clean=${test.profiles}/clean-dir ${build.data} ${project.root}/build/work/* broker.ready=BRK-1004 broker.stopped=Exception broker.config=${project.root}/build/etc/config-systests-derby.xml qpid.amqp.version=0-9 profile.excludes=JavaStandaloneExcludes +broker.clean.between.tests=true |