summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java71
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java6
-rw-r--r--java/test-profiles/java-derby.testprofile3
3 files changed, 49 insertions, 31 deletions
diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java
index ae7e30c231..acc7d5a4c1 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java
@@ -1,36 +1,37 @@
/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
package org.apache.qpid.test.unit.ack;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.util.FileUtils;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
-import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TransactionRolledBackException;
+import java.io.File;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -59,9 +60,9 @@ public class AcknowledgeAfterFailoverTest extends AcknowledgeTest implements Con
* failover took place
*
* @param transacted create a transacted session for this test
- * @param mode if not transacted what ack mode to use for this test
+ * @param mode if not transacted what ack mode to use for this test
* @throws Exception if a problem occured during test setup.
- */
+ */
@Override
protected void init(boolean transacted, int mode) throws Exception
{
@@ -69,27 +70,38 @@ public class AcknowledgeAfterFailoverTest extends AcknowledgeTest implements Con
((AMQConnection) _connection).setConnectionListener(this);
}
- protected void prepBroker(int count) throws Exception
+ protected void prepBroker(int index) throws Exception
{
- if (count % 2 == 1)
+ // If this is the last message then we can skip the prep.
+ if (index == NUM_MESSAGES)
+ {
+ return;
+ }
+
+ if (index % 2 == 0)
{
failBroker(getFailingPort());
+ // Clean up the failed broker
+ FileUtils.delete(new File(System.getProperty("QPID_WORK") + "/" + getFailingPort()), true);
}
else
{
failBroker(getPort());
+ // Clean up the failed broker
+ FileUtils.delete(new File(System.getProperty("QPID_WORK") + "/" + getPort()), true);
}
+ // Ensure we have the right data on the broker
Connection connection = getConnection();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
// ensure destination is created.
session.createConsumer(_queue).close();
- sendMessage(session, _queue, count, NUM_MESSAGES - count, 0);
+ sendMessage(session, _queue, 1, index + 1, 0);
if (_consumerSession.getAcknowledgeMode() != AMQSession.NO_ACKNOWLEDGE)
{
- assertEquals("Wrong number of messages on queue", count,
+ assertEquals("Wrong number of messages on queue", 1,
((AMQSession) session).getQueueDepth((AMQDestination) _queue));
}
@@ -97,7 +109,7 @@ public class AcknowledgeAfterFailoverTest extends AcknowledgeTest implements Con
try
{
- if (count % 2 == 1)
+ if (index % 2 == 0)
{
startBroker(getFailingPort());
}
@@ -120,7 +132,7 @@ public class AcknowledgeAfterFailoverTest extends AcknowledgeTest implements Con
try
{
- prepBroker(NUM_MESSAGES - msg.getIntProperty(INDEX) - 1);
+ prepBroker(msg.getIntProperty(INDEX));
}
catch (Exception e)
{
@@ -132,14 +144,13 @@ public class AcknowledgeAfterFailoverTest extends AcknowledgeTest implements Con
/**
* Test that Acking/Committing a message received before failover causes
* an exception at commit/ack time.
- *
+ * <p/>
* Expected behaviour is that in:
* * tx mode commit() throws a transacted RolledBackException
* * client ack mode throws an IllegalStateException
*
* @param transacted is this session trasacted
* @param mode What ack mode should be used if not trasacted
- *
* @throws Exception if something goes wrong.
*/
protected void testDirtyAcking(boolean transacted, int mode) throws Exception
diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java
index 7c9a77eb53..36731107c5 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java
@@ -99,6 +99,12 @@ public class AcknowledgeTest extends FailoverBaseCase
msg = _consumer.receive(1500);
}
+ if (_consumerSession.getTransacted())
+ {
+ //Acknowledge the last msg if we are testing transacted otherwise queueDepth will be 1
+ doAcknowlegement(msg);
+ }
+
assertEquals("Wrong number of messages on queue", 0,
((AMQSession) _consumerSession).getQueueDepth((AMQDestination) _queue));
}
diff --git a/java/test-profiles/java-derby.testprofile b/java/test-profiles/java-derby.testprofile
index 689b2b4357..1238a44c84 100644
--- a/java/test-profiles/java-derby.testprofile
+++ b/java/test-profiles/java-derby.testprofile
@@ -1,8 +1,9 @@
broker.language=java
broker=${project.root}/build/bin/qpid-server -p @PORT -m @MPORT --exclude-0-10 @PORT -c @CONFIG_FILE -l ${test.profiles}/log4j-test.xml
-broker.clean=${test.profiles}/clean-dir ${build.data} ${project.root}/build/work/derbyDB
+broker.clean=${test.profiles}/clean-dir ${build.data} ${project.root}/build/work/*
broker.ready=BRK-1004
broker.stopped=Exception
broker.config=${project.root}/build/etc/config-systests-derby.xml
qpid.amqp.version=0-9
profile.excludes=JavaStandaloneExcludes
+broker.clean.between.tests=true