summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2009-10-23 20:32:43 +0000
committerRobert Godfrey <rgodfrey@apache.org>2009-10-23 20:32:43 +0000
commitfa7a35e31c0eac1713801a632398b192af2e0ab2 (patch)
treef71eac0ee698940ef5e3cd84db08f5a21f148bfa
parenta7fe503beac48244dbbc06878821c603ca09acd3 (diff)
downloadqpid-python-fa7a35e31c0eac1713801a632398b192af2e0ab2.tar.gz
Merged from trunk up to 821829
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-0-10@829211 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/etc/log4j.xml4
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/util/NullApplicationRegistry.java3
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java13
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java1
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java2
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/queue/DeepQueueConsumeWithSelector.java5
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java4
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java22
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java1
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/Acknowledge2ConsumersTest.java193
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverOnMessageTest.java356
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java306
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java167
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java179
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/FailoverBeforeConsumingRecoverTest.java40
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/QuickAcking.java148
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java253
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/publish/DirtyTrasactedPubilshTest.java403
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java80
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java56
-rw-r--r--qpid/java/test-profiles/Excludes11
-rw-r--r--qpid/java/test-profiles/test-provider.properties5
22 files changed, 1954 insertions, 298 deletions
diff --git a/qpid/java/broker/etc/log4j.xml b/qpid/java/broker/etc/log4j.xml
index 8ca43ededd..4b71772a0e 100644
--- a/qpid/java/broker/etc/log4j.xml
+++ b/qpid/java/broker/etc/log4j.xml
@@ -87,6 +87,10 @@
<priority value="debug"/>
</category-->
+ <!-- Set the commons logging that the XML parser uses to WARN, it is very chatty at debug -->
+ <logger name="org.apache.commons">
+ <level value="WARN"/>
+ </logger>
<!-- Log all info events to file -->
<root>
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/NullApplicationRegistry.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/NullApplicationRegistry.java
index 4e78b5b413..3d37412376 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/NullApplicationRegistry.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/NullApplicationRegistry.java
@@ -27,6 +27,7 @@ import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.logging.RootMessageLoggerImpl;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.TestLogActor;
+import org.apache.qpid.server.logging.actors.BrokerActor;
import org.apache.qpid.server.logging.rawloggers.Log4jMessageLogger;
import org.apache.qpid.server.management.NoopManagedObjectRegistry;
import org.apache.qpid.server.plugins.PluginManager;
@@ -93,6 +94,8 @@ public class NullApplicationRegistry extends ApplicationRegistry
@Override
public void close() throws Exception
{
+ CurrentActor.set(new BrokerActor(_rootMessageLogger));
+
try
{
super.close();
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index fa15df34ec..fc81e32e4d 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -60,6 +60,7 @@ import javax.jms.Topic;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
+import javax.jms.TransactionRolledBackException;
import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.AMQException;
@@ -777,8 +778,16 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
try
{
+ //Check that we are clean to commit.
+ if (_failedOverDirty)
+ {
+ rollback();
+
+ throw new TransactionRolledBackException("Connection failover has occured since last send. " +
+ "Forced rollback");
+ }
+
- // TGM FIXME: what about failover?
// Acknowledge all delivered messages
while (true)
{
@@ -1509,6 +1518,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
sendRecover();
+ markClean();
+
if (!isSuspended)
{
suspendChannel(false);
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 2dfecc80ac..667785c441 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -779,6 +779,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
else
{
_session.addDeliveredMessage(msg.getDeliveryTag());
+ _session.markDirty();
}
break;
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java b/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java
index 67cda957fb..a3d015eadc 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java
@@ -253,7 +253,7 @@ public abstract class BlockingWaiter<T>
}
else
{
- System.err.println("WARNING: new error arrived while old one not yet processed");
+ System.err.println("WARNING: new error '" + e == null ? "null" : e.getMessage() + "' arrived while old one not yet processed:" + _error.getMessage());
}
try
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/DeepQueueConsumeWithSelector.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/DeepQueueConsumeWithSelector.java
index dfb5cde247..83f0f87bc5 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/DeepQueueConsumeWithSelector.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/DeepQueueConsumeWithSelector.java
@@ -63,7 +63,6 @@ import java.util.concurrent.TimeUnit;
*/
public class DeepQueueConsumeWithSelector extends QpidTestCase implements MessageListener
{
- private static final String INDEX = "index";
private static final int MESSAGE_COUNT = 10000;
private static final int BATCH_SIZE = MESSAGE_COUNT / 10;
@@ -129,9 +128,7 @@ public class DeepQueueConsumeWithSelector extends QpidTestCase implements Messag
@Override
public Message createNextMessage(Session session, int msgCount) throws JMSException
{
- Message message = session.createTextMessage("Message :" + msgCount);
-
- message.setIntProperty(INDEX, msgCount);
+ Message message = super.createNextMessage(session,msgCount);
if ((msgCount % BATCH_SIZE) == 0 )
{
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
index 2e625f95c0..f9cf48a2b1 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
@@ -497,7 +497,7 @@ public class QueueBrowserAutoAckTest extends FailoverBaseCase
if (msgCount == failPoint)
{
- failBroker();
+ failBroker(getFailingPort());
}
}
@@ -529,7 +529,7 @@ public class QueueBrowserAutoAckTest extends FailoverBaseCase
sendMessages("connection2", messages);
}
- failBroker();
+ failBroker(getFailingPort());
checkQueueDepth(messages);
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java
index dfc3bb7b42..c307176f3f 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java
@@ -37,7 +37,6 @@ import javax.naming.NamingException;
import org.apache.log4j.Logger;
import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQSession_0_10;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.ConnectionListener;
import org.apache.qpid.jms.ConnectionURL;
@@ -58,13 +57,12 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener
private Session consumerSession;
private MessageConsumer consumer;
- private static int usedBrokers = 0;
private CountDownLatch failoverComplete;
- private static final long DEFAULT_FAILOVER_TIME = 10000L;
private boolean CLUSTERED = Boolean.getBoolean("profile.clustered");
private int seed;
private Random rand;
-
+ private int _currentPort = getFailingPort();
+
@Override
protected void setUp() throws Exception
{
@@ -227,7 +225,7 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener
_logger.info("Failing over");
- causeFailure(DEFAULT_FAILOVER_TIME);
+ causeFailure(_currentPort, DEFAULT_FAILOVER_TIME);
// Check that you produce and consume the rest of messages.
_logger.debug("==================");
@@ -242,10 +240,10 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener
_logger.debug("==================");
}
- private void causeFailure(long delay)
+ private void causeFailure(int port, long delay)
{
- failBroker();
+ failBroker(port);
_logger.info("Awaiting Failover completion");
try
@@ -268,7 +266,7 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener
Message msg = consumer.receive();
assertNotNull("Expected msgs not received", msg);
- causeFailure(DEFAULT_FAILOVER_TIME);
+ causeFailure(getFailingPort(), DEFAULT_FAILOVER_TIME);
Exception failure = null;
try
@@ -314,7 +312,7 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener
long failTime = System.nanoTime() + FAILOVER_DELAY * 1000000;
//Fail the first broker
- causeFailure(FAILOVER_DELAY + DEFAULT_FAILOVER_TIME);
+ causeFailure(getFailingPort(), FAILOVER_DELAY + DEFAULT_FAILOVER_TIME);
//Reconnection should occur
assertTrue("Failover did not take long enough", System.nanoTime() > failTime);
@@ -344,15 +342,15 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener
_logger.debug("===================================================================");
runP2PFailover(numMessages, false,false, false);
- startBroker(getFailingPort());
+ startBroker(_currentPort);
if (useAltPort)
{
- setFailingPort(altPort);
+ _currentPort = altPort;
useAltPort = false;
}
else
{
- setFailingPort(stdPort);
+ _currentPort = stdPort;
useAltPort = true;
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java
index 1ec39bd1e0..a09589b121 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java
@@ -33,6 +33,7 @@ import javax.jms.Session;
import junit.framework.Assert;
+import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQQueue;
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/Acknowledge2ConsumersTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/Acknowledge2ConsumersTest.java
new file mode 100644
index 0000000000..4b45a96c20
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/Acknowledge2ConsumersTest.java
@@ -0,0 +1,193 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.test.unit.ack;
+
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.test.utils.FailoverBaseCase;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+public class Acknowledge2ConsumersTest extends FailoverBaseCase
+{
+ protected static int NUM_MESSAGES = 100;
+ protected Connection _con;
+ protected Queue _queue;
+ private Session _producerSession;
+ private Session _consumerSession;
+ private MessageConsumer _consumerA;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ _queue = (Queue) getInitialContext().lookup("queue");
+
+ //Create Producer put some messages on the queue
+ _con = getConnection();
+ }
+
+ private void init(boolean transacted, int mode) throws JMSException
+ {
+ _producerSession = _con.createSession(true, Session.SESSION_TRANSACTED);
+ _consumerSession = _con.createSession(transacted, mode);
+ _consumerA = _consumerSession.createConsumer(_queue);
+ _con.start();
+ }
+
+ /**
+ * Produces Messages that
+ *
+ * @param transacted
+ * @param mode
+ *
+ * @throws Exception
+ */
+ private void test2ConsumersAcking(boolean transacted, int mode) throws Exception
+ {
+ init(transacted, mode);
+
+ // These should all end up being prefetched by sessionA
+ sendMessage(_producerSession, _queue, NUM_MESSAGES / 2);
+
+ //Create a second consumer (consumerB) to consume some of the messages
+ MessageConsumer consumerB = _consumerSession.createConsumer(_queue);
+
+ // These messages should be roundrobined between A and B
+ sendMessage(_producerSession, _queue, NUM_MESSAGES / 2);
+
+ int count = 0;
+ //Use consumerB to receive messages it has
+ Message msg = consumerB.receive(1500);
+ while (msg != null)
+ {
+ if (mode == Session.CLIENT_ACKNOWLEDGE)
+ {
+ msg.acknowledge();
+ }
+ count++;
+ msg = consumerB.receive(1500);
+ }
+ if (transacted)
+ {
+ _consumerSession.commit();
+ }
+
+ // Close the consumers
+ _consumerA.close();
+ consumerB.close();
+
+ // and close the session to release any prefetched messages.
+ _consumerSession.close();
+ assertEquals("Wrong number of messages on queue", NUM_MESSAGES - count,
+ ((AMQSession) _producerSession).getQueueDepth((AMQDestination) _queue));
+
+ // Clean up messages that may be left on the queue
+ _consumerSession = _con.createSession(transacted, mode);
+ _consumerA = _consumerSession.createConsumer(_queue);
+ msg = _consumerA.receive(1500);
+ while (msg != null)
+ {
+ if (mode == Session.CLIENT_ACKNOWLEDGE)
+ {
+ msg.acknowledge();
+ }
+ msg = _consumerA.receive(1500);
+ }
+ _consumerA.close();
+ if (transacted)
+ {
+ _consumerSession.commit();
+ }
+ _consumerSession.close();
+ }
+
+ public void test2ConsumersAutoAck() throws Exception
+ {
+ test2ConsumersAcking(false, Session.AUTO_ACKNOWLEDGE);
+ }
+
+ public void test2ConsumersClientAck() throws Exception
+ {
+ test2ConsumersAcking(false, Session.CLIENT_ACKNOWLEDGE);
+ }
+
+ public void test2ConsumersTx() throws Exception
+ {
+ test2ConsumersAcking(true, Session.SESSION_TRANSACTED);
+ }
+
+
+
+//
+// /**
+// * Check that session level acknowledge does correctly ack all previous
+// * values. Send 3 messages(0,1,2) then ack 1 and 2. If session ack is
+// * working correctly then acking 1 will also ack 0. Acking 2 will not
+// * attempt to re-ack 0 and 1.
+// *
+// * @throws Exception
+// */
+// public void testSessionAck() throws Exception
+// {
+// init(false, Session.CLIENT_ACKNOWLEDGE);
+//
+// sendMessage(_producerSession, _queue, 3);
+// Message msg;
+//
+// // Drop msg 0
+// _consumerA.receive(RECEIVE_TIMEOUT);
+//
+// // Take msg 1
+// msg = _consumerA.receive(RECEIVE_TIMEOUT);
+//
+// assertNotNull("Message 1 not correctly received.", msg);
+// assertEquals("Incorrect message received", 1, msg.getIntProperty(INDEX));
+//
+// // This should also ack msg 0
+// msg.acknowledge();
+//
+// // Take msg 2
+// msg = _consumerA.receive(RECEIVE_TIMEOUT);
+//
+// assertNotNull("Message 2 not correctly received.", msg);
+// assertEquals("Incorrect message received", 2, msg.getIntProperty(INDEX));
+//
+// // This should just ack msg 2
+// msg.acknowledge();
+//
+// _consumerA.close();
+// _consumerSession.close();
+//
+// assertEquals("Queue not empty.", 0,
+// ((AMQSession) _producerSession).getQueueDepth((AMQDestination) _queue));
+// _con.close();
+//
+//
+// }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverOnMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverOnMessageTest.java
new file mode 100644
index 0000000000..f22a405fc3
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverOnMessageTest.java
@@ -0,0 +1,356 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.ack;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.jms.ConnectionListener;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+import javax.jms.TransactionRolledBackException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+public class AcknowledgeAfterFailoverOnMessageTest extends AcknowledgeOnMessageTest implements ConnectionListener
+{
+
+ protected CountDownLatch _failoverCompleted = new CountDownLatch(1);
+ private MessageListener _listener = null;
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ NUM_MESSAGES = 10;
+ }
+
+ /**
+ * Override default init to add connectionListener so we can verify that
+ * failover took place
+ *
+ * @param transacted create a transacted session for this test
+ * @param mode if not transacted what ack mode to use for this test
+ *
+ * @throws Exception if a problem occured during test setup.
+ */
+ @Override
+ public void init(boolean transacted, int mode) throws Exception
+ {
+ super.init(transacted, mode);
+ ((AMQConnection) _connection).setConnectionListener(this);
+ // Override the listener for the dirtyAck testing.
+ if (_listener != null)
+ {
+ _consumer.setMessageListener(_listener);
+ }
+ }
+
+ protected void prepBroker(int count) throws Exception
+ {
+ //Stop the connection whilst we repopulate the broker, or the no_ack
+ // test will drain the msgs before we can check we put the right number
+ // back on again.
+// _connection.stop();
+
+ Connection connection = getConnection();
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ // ensure destination is created.
+ session.createConsumer(_queue).close();
+
+ sendMessage(session, _queue, count, NUM_MESSAGES - count, 0);
+
+ if (_consumerSession.getAcknowledgeMode() != AMQSession.NO_ACKNOWLEDGE)
+ {
+ assertEquals("Wrong number of messages on queue", count,
+ ((AMQSession) session).getQueueDepth((AMQDestination) _queue));
+ }
+
+ connection.close();
+
+// _connection.start();
+ }
+
+ @Override
+ public void doAcknowlegement(Message msg) throws JMSException
+ {
+ //Acknowledge current message
+ super.doAcknowlegement(msg);
+
+ int msgCount = msg.getIntProperty(INDEX);
+
+ if (msgCount % 2 == 0)
+ {
+ failBroker(getFailingPort());
+ }
+ else
+ {
+ failBroker(getPort());
+ }
+
+ try
+ {
+ prepBroker(NUM_MESSAGES - msgCount - 1);
+ }
+ catch (Exception e)
+ {
+ fail("Unable to prep new broker," + e.getMessage());
+ }
+
+ try
+ {
+
+ if (msgCount % 2 == 0)
+ {
+ startBroker(getFailingPort());
+ }
+ else
+ {
+ startBroker(getPort());
+ }
+ }
+ catch (Exception e)
+ {
+ fail("Unable to start failover broker," + e.getMessage());
+ }
+
+ }
+
+ int msgCount = 0;
+ boolean cleaned = false;
+
+ class DirtyAckingHandler implements MessageListener
+ {
+ /**
+ * Validate first message but do nothing with it.
+ *
+ * Failover
+ *
+ * The receive the message again
+ *
+ * @param message
+ */
+ public void onMessage(Message message)
+ {
+ // Stop processing if we have an error and had to stop running.
+ if (_receviedAll.getCount() == 0)
+ {
+ _logger.debug("Dumping msgs due to error(" + _causeOfFailure.get().getMessage() + "):" + message);
+ return;
+ }
+
+ try
+ {
+ // Check we have the next message as expected
+ assertNotNull("Message " + msgCount + " not correctly received.", message);
+ assertEquals("Incorrect message received", msgCount, message.getIntProperty(INDEX));
+
+ if (msgCount == 0 && _failoverCompleted.getCount() != 0)
+ {
+ // This is the first message we've received so lets fail the broker
+
+ failBroker(getFailingPort());
+
+ repopulateBroker();
+
+ _logger.error("Received first msg so failing over");
+
+ return;
+ }
+
+ msgCount++;
+
+ // Don't acknowlege the first message after failover so we can commit
+ // them together
+ if (msgCount == 1)
+ {
+ _logger.error("Received first msg after failover ignoring:" + msgCount);
+
+ // Acknowledge the first message if we are now on the cleaned pass
+ if (cleaned)
+ {
+ _receviedAll.countDown();
+ }
+
+ return;
+ }
+
+ if (_consumerSession.getTransacted())
+ {
+ try
+ {
+ _consumerSession.commit();
+ if (!cleaned)
+ {
+ fail("Session is dirty we should get an TransactionRolledBackException");
+ }
+ }
+ catch (TransactionRolledBackException trbe)
+ {
+ //expected path
+ }
+ }
+ else
+ {
+ try
+ {
+ message.acknowledge();
+ if (!cleaned)
+ {
+ fail("Session is dirty we should get an IllegalStateException");
+ }
+ }
+ catch (javax.jms.IllegalStateException ise)
+ {
+ assertEquals("Incorrect Exception thrown", "has failed over", ise.getMessage());
+ // Recover the sesion and try again.
+ _consumerSession.recover();
+ }
+ }
+
+ // Acknowledge the last message if we are in a clean state
+ // this will then trigger test teardown.
+ if (cleaned)
+ {
+ _receviedAll.countDown();
+ }
+
+ //Reset message count so we can try again.
+ msgCount = 0;
+ cleaned = true;
+ }
+ catch (Exception e)
+ {
+ // If something goes wrong stop and notifiy main thread.
+ fail(e);
+ }
+ }
+ }
+
+ /**
+ * Test that Acking/Committing a message received before failover causes
+ * an exception at commit/ack time.
+ *
+ * Expected behaviour is that in:
+ * * tx mode commit() throws a transacted RolledBackException
+ * * client ack mode throws an IllegalStateException
+ *
+ * @param transacted is this session trasacted
+ * @param mode What ack mode should be used if not trasacted
+ *
+ * @throws Exception if something goes wrong.
+ */
+ protected void testDirtyAcking(boolean transacted, int mode) throws Exception
+ {
+ NUM_MESSAGES = 2;
+ _listener = new DirtyAckingHandler();
+
+ super.testAcking(transacted, mode);
+ }
+
+ public void testDirtyClientAck() throws Exception
+ {
+ testDirtyAcking(false, Session.CLIENT_ACKNOWLEDGE);
+ }
+
+ public void testDirtyAckingTransacted() throws Exception
+ {
+ testDirtyAcking(true, Session.SESSION_TRANSACTED);
+ }
+
+ private void repopulateBroker() throws Exception
+ {
+ // Repopulate this new broker so we can test what happends after failover
+
+ //Get the connection to the first (main port) broker.
+ Connection connection = getConnection();
+ // Use a transaction to send messages so we can be sure they arrive.
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ // ensure destination is created.
+ session.createConsumer(_queue).close();
+
+ sendMessage(session, _queue, NUM_MESSAGES);
+
+ assertEquals("Wrong number of messages on queue", NUM_MESSAGES,
+ ((AMQSession) session).getQueueDepth((AMQDestination) _queue));
+
+ connection.close();
+ }
+
+ // AMQConnectionListener Interface.. used so we can validate that we
+ // actually failed over.
+
+ public void bytesSent(long count)
+ {
+ }
+
+ public void bytesReceived(long count)
+ {
+ }
+
+ public boolean preFailover(boolean redirect)
+ {
+ //Allow failover
+ return true;
+ }
+
+ public boolean preResubscribe()
+ {
+ //Allow failover
+ return true;
+ }
+
+ public void failoverComplete()
+ {
+ _failoverCompleted.countDown();
+ }
+
+ /**
+ * Override so we can block until failover has completd
+ *
+ * @param port
+ */
+ @Override
+ public void failBroker(int port)
+ {
+ super.failBroker(port);
+
+ try
+ {
+ if (!_failoverCompleted.await(DEFAULT_FAILOVER_TIME, TimeUnit.MILLISECONDS))
+ {
+ // Use an exception so that we use our local fail() that notifies the main thread of failure
+ throw new Exception("Failover did not occur in specified time:" + DEFAULT_FAILOVER_TIME);
+ }
+
+ }
+ catch (Exception e)
+ {
+ // Use an exception so that we use our local fail() that notifies the main thread of failure
+ fail(e);
+ }
+ }
+
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java
new file mode 100644
index 0000000000..30cc48691f
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java
@@ -0,0 +1,306 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.ack;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.jms.ConnectionListener;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TransactionRolledBackException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ *
+ */
+public class AcknowledgeAfterFailoverTest extends AcknowledgeTest implements ConnectionListener
+{
+
+ protected CountDownLatch _failoverCompleted = new CountDownLatch(1);
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ // This must be even for the test to run correctly.
+ // Otherwise we will kill the standby broker
+ // not the one we are connected to.
+ // The test will still pass but it will not be exactly
+ // as described.
+ NUM_MESSAGES = 6;
+ }
+
+ /**
+ * Override default init to add connectionListener so we can verify that
+ * failover took place
+ *
+ * @param transacted create a transacted session for this test
+ * @param mode if not transacted what ack mode to use for this test
+ * @throws Exception if a problem occured during test setup.
+ */
+ @Override
+ protected void init(boolean transacted, int mode) throws Exception
+ {
+ super.init(transacted, mode);
+ ((AMQConnection) _connection).setConnectionListener(this);
+ }
+
+ protected void prepBroker(int count) throws Exception
+ {
+ if (count % 2 == 1)
+ {
+ failBroker(getFailingPort());
+ }
+ else
+ {
+ failBroker(getPort());
+ }
+
+ Connection connection = getConnection();
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ // ensure destination is created.
+ session.createConsumer(_queue).close();
+
+ sendMessage(session, _queue, count, NUM_MESSAGES - count, 0);
+
+ if (_consumerSession.getAcknowledgeMode() != AMQSession.NO_ACKNOWLEDGE)
+ {
+ assertEquals("Wrong number of messages on queue", count,
+ ((AMQSession) session).getQueueDepth((AMQDestination) _queue));
+ }
+
+ connection.close();
+
+ try
+ {
+ if (count % 2 == 1)
+ {
+ startBroker(getFailingPort());
+ }
+ else
+ {
+ startBroker(getPort());
+ }
+ }
+ catch (Exception e)
+ {
+ fail("Unable to start failover broker," + e.getMessage());
+ }
+ }
+
+ @Override
+ public void doAcknowlegement(Message msg) throws JMSException
+ {
+ //Acknowledge current message
+ super.doAcknowlegement(msg);
+
+ try
+ {
+ prepBroker(NUM_MESSAGES - msg.getIntProperty(INDEX) - 1);
+ }
+ catch (Exception e)
+ {
+ fail("Unable to prep new broker," + e.getMessage());
+ }
+
+ }
+
+ /**
+ * Test that Acking/Committing a message received before failover causes
+ * an exception at commit/ack time.
+ *
+ * Expected behaviour is that in:
+ * * tx mode commit() throws a transacted RolledBackException
+ * * client ack mode throws an IllegalStateException
+ *
+ * @param transacted is this session trasacted
+ * @param mode What ack mode should be used if not trasacted
+ *
+ * @throws Exception if something goes wrong.
+ */
+ protected void testDirtyAcking(boolean transacted, int mode) throws Exception
+ {
+ NUM_MESSAGES = 2;
+ //Test Dirty Failover Fails
+ init(transacted, mode);
+
+ _connection.start();
+
+ Message msg = _consumer.receive(1500);
+
+ int count = 0;
+ assertNotNull("Message " + count + " not correctly received.", msg);
+ assertEquals("Incorrect message received", count, msg.getIntProperty(INDEX));
+
+ //Don't acknowledge just prep the next broker. Without changing count
+ // Prep the new broker to have all all the messages so we can validate
+ // that they can all be correctly received.
+ try
+ {
+
+ //Stop the connection so we can validate the number of message count
+ // on the queue is correct after failover
+ _connection.stop();
+ failBroker(getFailingPort());
+
+ //Get the connection to the first (main port) broker.
+ Connection connection = getConnection();//getConnectionFactory("connection1").getConnectionURL());
+ // Use a transaction to send messages so we can be sure they arrive.
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ // ensure destination is created.
+ session.createConsumer(_queue).close();
+
+ sendMessage(session, _queue, NUM_MESSAGES);
+
+ assertEquals("Wrong number of messages on queue", NUM_MESSAGES,
+ ((AMQSession) session).getQueueDepth((AMQDestination) _queue));
+
+ connection.close();
+
+ //restart connection
+ _connection.start();
+ }
+ catch (Exception e)
+ {
+ fail("Unable to prep new broker," + e.getMessage());
+ }
+
+ // Consume the next message - don't check what it is as a normal would
+ // assume it is msg 1 but as we've fallen over it is msg 0 again.
+ msg = _consumer.receive(1500);
+
+ if (_consumerSession.getTransacted())
+ {
+ try
+ {
+ _consumerSession.commit();
+ fail("Session is dirty we should get an TransactionRolledBackException");
+ }
+ catch (TransactionRolledBackException trbe)
+ {
+ //expected path
+ }
+ }
+ else
+ {
+ try
+ {
+ msg.acknowledge();
+ fail("Session is dirty we should get an IllegalStateException");
+ }
+ catch (javax.jms.IllegalStateException ise)
+ {
+ assertEquals("Incorrect Exception thrown", "has failed over", ise.getMessage());
+ // Recover the sesion and try again.
+ _consumerSession.recover();
+ }
+ }
+
+ msg = _consumer.receive(1500);
+ // Validate we now get the first message back
+ assertEquals(0, msg.getIntProperty(INDEX));
+
+ msg = _consumer.receive(1500);
+ // and the second message
+ assertEquals(1, msg.getIntProperty(INDEX));
+
+ // And now verify that we can now commit the clean session
+ if (_consumerSession.getTransacted())
+ {
+ _consumerSession.commit();
+ }
+ else
+ {
+ msg.acknowledge();
+ }
+
+ assertEquals("Wrong number of messages on queue", 0,
+ ((AMQSession) _consumerSession).getQueueDepth((AMQDestination) _queue));
+ }
+
+ public void testDirtyClientAck() throws Exception
+ {
+ testDirtyAcking(false, Session.CLIENT_ACKNOWLEDGE);
+ }
+
+ public void testDirtyAckingTransacted() throws Exception
+ {
+ testDirtyAcking(true, Session.SESSION_TRANSACTED);
+ }
+
+ // AMQConnectionListener Interface.. used so we can validate that we
+ // actually failed over.
+
+ public void bytesSent(long count)
+ {
+ }
+
+ public void bytesReceived(long count)
+ {
+ }
+
+ public boolean preFailover(boolean redirect)
+ {
+ //Allow failover
+ return true;
+ }
+
+ public boolean preResubscribe()
+ {
+ //Allow failover
+ return true;
+ }
+
+ public void failoverComplete()
+ {
+ _failoverCompleted.countDown();
+ }
+
+ /**
+ * Override so we can block until failover has completd
+ *
+ * @param port
+ */
+ @Override
+ public void failBroker(int port)
+ {
+ super.failBroker(port);
+
+ try
+ {
+ if (!_failoverCompleted.await(DEFAULT_FAILOVER_TIME, TimeUnit.MILLISECONDS))
+ {
+ fail("Failover did not occur in specified time:" + DEFAULT_FAILOVER_TIME);
+ }
+ }
+ catch (InterruptedException e)
+ {
+ fail("Failover was interuppted");
+ }
+ }
+
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java
new file mode 100644
index 0000000000..73308d41c0
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java
@@ -0,0 +1,167 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.ack;
+
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.JMSAMQException;
+import org.apache.qpid.client.failover.FailoverException;
+
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class AcknowledgeOnMessageTest extends AcknowledgeTest implements MessageListener
+{
+ protected CountDownLatch _receviedAll;
+ protected AtomicReference<Exception> _causeOfFailure = new AtomicReference<Exception>(null);
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ }
+
+ @Override
+ public void init(boolean transacted, int mode) throws Exception
+ {
+ _receviedAll = new CountDownLatch(NUM_MESSAGES);
+
+ super.init(transacted, mode);
+ _consumer.setMessageListener(this);
+ }
+
+ /**
+ * @param transacted
+ * @param mode
+ *
+ * @throws Exception
+ */
+ protected void testAcking(boolean transacted, int mode) throws Exception
+ {
+ init(transacted, mode);
+
+ _connection.start();
+
+ int lastCount = (int) _receviedAll.getCount();
+
+ boolean complete = _receviedAll.await(5000L, TimeUnit.MILLISECONDS);
+
+ while (!complete)
+ {
+ int currentCount = (int) _receviedAll.getCount();
+
+ // make sure we have received a message in the last cycle.
+ if (lastCount == currentCount)
+ {
+ break;
+ }
+
+ complete = _receviedAll.await(5000L, TimeUnit.MILLISECONDS);
+ }
+
+ if (!complete)
+ {
+ // Check to see if we ended due to an exception in the onMessage handler
+ Exception cause = _causeOfFailure.get();
+ if (cause != null)
+ {
+ cause.printStackTrace();
+ fail(cause.getMessage());
+ }
+ else
+ {
+ fail("All messages not received missing:" + _receviedAll.getCount() + "/" + NUM_MESSAGES);
+ }
+ }
+
+ // Check to see if we ended due to an exception in the onMessage handler
+ Exception cause = _causeOfFailure.get();
+ if (cause != null)
+ {
+ cause.printStackTrace();
+ fail(cause.getMessage());
+ }
+
+ try
+ {
+ _consumer.close();
+ }
+ catch (JMSAMQException amqe)
+ {
+ if (amqe.getLinkedException() instanceof FailoverException)
+ {
+ fail("QPID-143 : Auto Ack can acknowledge message from previous session after failver. If failover occurs between deliver and ack.");
+ }
+ // else Rethrow for TestCase to catch.
+ throw amqe;
+ }
+
+ _consumerSession.close();
+
+ assertEquals("Wrong number of messages on queue", 0,
+ ((AMQSession) getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE)).getQueueDepth((AMQDestination) _queue));
+ }
+
+ public void onMessage(Message message)
+ {
+ try
+ {
+ int count = NUM_MESSAGES - (int) _receviedAll.getCount();
+
+ assertEquals("Incorrect message received", count, message.getIntProperty(INDEX));
+
+ count++;
+ if (count < NUM_MESSAGES)
+ {
+ //Send the next message
+ _producer.send(createNextMessage(_consumerSession, count));
+ }
+
+ doAcknowlegement(message);
+
+ _receviedAll.countDown();
+ }
+ catch (Exception e)
+ {
+ // This will end the test run by counting down _receviedAll
+ fail(e);
+ }
+ }
+
+ /**
+ * Pass the given exception back to the waiting thread to fail the test run.
+ *
+ * @param e The exception that is causing the test to fail.
+ */
+ protected void fail(Exception e)
+ {
+ _causeOfFailure.set(e);
+ // End the test.
+ while (_receviedAll.getCount() != 0)
+ {
+ _receviedAll.countDown();
+ }
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java
index c367a0856c..7c9a77eb53 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java
@@ -1,7 +1,5 @@
-package org.apache.qpid.test.unit.ack;
-
/*
- *
+ *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -21,133 +19,138 @@ package org.apache.qpid.test.unit.ack;
*
*/
+package org.apache.qpid.test.unit.ack;
+
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.test.utils.FailoverBaseCase;
+
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
+import javax.jms.MessageProducer;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.message.AbstractJMSMessage;
-import org.apache.qpid.test.utils.QpidTestCase;
-
-public class AcknowledgeTest extends QpidTestCase
+public class AcknowledgeTest extends FailoverBaseCase
{
- protected static int NUM_MESSAGES = 100;
- protected Connection _con;
+ protected int NUM_MESSAGES;
+ protected Connection _connection;
protected Queue _queue;
- private MessageProducer _producer;
- private Session _producerSession;
- private Session _consumerSession;
- private MessageConsumer _consumerA;
+ protected Session _consumerSession;
+ protected MessageConsumer _consumer;
+ protected MessageProducer _producer;
@Override
protected void setUp() throws Exception
{
super.setUp();
- _queue = (Queue) getInitialContext().lookup("queue");
+ NUM_MESSAGES = 5;
+
+ _queue = getTestQueue();
//Create Producer put some messages on the queue
- _con = getConnection();
- _con.start();
+ _connection = getConnection();
}
- private void init(boolean transacted, int mode) throws JMSException {
- _producerSession = _con.createSession(true, Session.AUTO_ACKNOWLEDGE);
- _consumerSession = _con.createSession(transacted, mode);
- _producer = _producerSession.createProducer(_queue);
- _consumerA = _consumerSession.createConsumer(_queue);
- }
+ protected void init(boolean transacted, int mode) throws Exception
+ {
+ _consumerSession = _connection.createSession(transacted, mode);
+ _consumer = _consumerSession.createConsumer(_queue);
+ _producer = _consumerSession.createProducer(_queue);
+
+ // These should all end up being prefetched by session
+ sendMessage(_consumerSession, _queue, 1);
+
+ assertEquals("Wrong number of messages on queue", 1,
+ ((AMQSession) _consumerSession).getQueueDepth((AMQDestination) _queue));
+ }
/**
- * Produces and consumes messages an either ack or commit the receipt of those messages
- *
* @param transacted
* @param mode
+ *
* @throws Exception
*/
- private void testMessageAck(boolean transacted, int mode) throws Exception
+ protected void testAcking(boolean transacted, int mode) throws Exception
{
- init(transacted, mode);
- sendMessage(_producerSession, _queue, NUM_MESSAGES/2);
- _producerSession.commit();
- MessageConsumer consumerB = _consumerSession.createConsumer(_queue);
- sendMessage(_producerSession, _queue, NUM_MESSAGES/2);
- _producerSession.commit();
+ init(transacted, mode);
+
+ _connection.start();
+
+ Message msg = _consumer.receive(1500);
+
int count = 0;
- Message msg = consumerB.receive(1500);
- while (msg != null)
+ while (count < NUM_MESSAGES)
{
- if (mode == Session.CLIENT_ACKNOWLEDGE)
+ assertNotNull("Message " + count + " not correctly received.", msg);
+ assertEquals("Incorrect message received", count, msg.getIntProperty(INDEX));
+ count++;
+
+ if (count < NUM_MESSAGES)
{
- msg.acknowledge();
+ //Send the next message
+ _producer.send(createNextMessage(_consumerSession, count));
}
- count++;
- msg = consumerB.receive(1500);
+
+ doAcknowlegement(msg);
+
+ msg = _consumer.receive(1500);
}
- if (transacted)
- {
- _consumerSession.commit();
- }
- _consumerA.close();
- consumerB.close();
- _consumerSession.close();
- assertEquals("Wrong number of messages on queue", NUM_MESSAGES - count,
- ((AMQSession) _producerSession).getQueueDepth((AMQDestination) _queue));
-
- // Clean up messages that may be left on the queue
- _consumerSession = _con.createSession(transacted, mode);
- _consumerA = _consumerSession.createConsumer(_queue);
- msg = _consumerA.receive(1500);
- while (msg != null)
+
+ assertEquals("Wrong number of messages on queue", 0,
+ ((AMQSession) _consumerSession).getQueueDepth((AMQDestination) _queue));
+ }
+
+ /**
+ * Perform the acknowledgement of messages if additionally required.
+ *
+ * @param msg
+ *
+ * @throws JMSException
+ */
+ protected void doAcknowlegement(Message msg) throws JMSException
+ {
+ if (_consumerSession.getTransacted())
{
- if (mode == Session.CLIENT_ACKNOWLEDGE)
- {
- msg.acknowledge();
- }
- msg = _consumerA.receive(1500);
+ _consumerSession.commit();
}
- _consumerA.close();
- if (transacted)
+
+ if (_consumerSession.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
{
- _consumerSession.commit();
+ msg.acknowledge();
}
- _consumerSession.close();
}
-
- public void test2ConsumersAutoAck() throws Exception
+
+ public void testClientAck() throws Exception
{
- testMessageAck(false, Session.AUTO_ACKNOWLEDGE);
+ testAcking(false, Session.CLIENT_ACKNOWLEDGE);
}
- public void test2ConsumersClientAck() throws Exception
+ public void testAutoAck() throws Exception
{
- testMessageAck(true, Session.CLIENT_ACKNOWLEDGE);
+ testAcking(false, Session.AUTO_ACKNOWLEDGE);
}
-
- public void test2ConsumersTx() throws Exception
+
+ public void testTransacted() throws Exception
{
- testMessageAck(true, Session.AUTO_ACKNOWLEDGE);
+ testAcking(true, Session.SESSION_TRANSACTED);
}
-
- public void testIndividualAck() throws Exception
+
+ public void testDupsOk() throws Exception
{
- init(false, Session.CLIENT_ACKNOWLEDGE);
- sendMessage(_producerSession, _queue, 3);
- _producerSession.commit();
- Message msg = null;
- for (int i = 0; i < 2; i++)
- {
- msg = _consumerA.receive(RECEIVE_TIMEOUT);
- ((AbstractJMSMessage)msg).acknowledgeThis();
- }
- msg = _consumerA.receive(RECEIVE_TIMEOUT);
- msg.acknowledge();
- _con.close();
+ testAcking(false, Session.DUPS_OK_ACKNOWLEDGE);
}
-
+
+ public void testNoAck() throws Exception
+ {
+ testAcking(false, AMQSession.NO_ACKNOWLEDGE);
+ }
+
+ public void testPreAck() throws Exception
+ {
+ testAcking(false, AMQSession.PRE_ACKNOWLEDGE);
+ }
+
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/FailoverBeforeConsumingRecoverTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/FailoverBeforeConsumingRecoverTest.java
new file mode 100644
index 0000000000..834b17430b
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/FailoverBeforeConsumingRecoverTest.java
@@ -0,0 +1,40 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.ack;
+
+import org.apache.qpid.jms.Session;
+
+import javax.jms.Message;
+import javax.jms.Queue;
+
+public class FailoverBeforeConsumingRecoverTest extends RecoverTest
+{
+
+ @Override
+ protected void initTest() throws Exception
+ {
+ super.initTest();
+ failBroker(getFailingPort());
+
+ Queue queue = _consumerSession.createQueue(getTestQueueName());
+ sendMessage(_connection.createSession(false, Session.AUTO_ACKNOWLEDGE), queue, SENT_COUNT);
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/QuickAcking.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/QuickAcking.java
new file mode 100644
index 0000000000..6c4b7ba01b
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/QuickAcking.java
@@ -0,0 +1,148 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.ack;
+
+import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+/**
+ * This is a quick manual test to validate acking after failover with a
+ * transacted session.
+ *
+ * Start an external broker then run this test. Std Err will print.
+ * Sent Message: 1
+ * Received Message: 1
+ *
+ * You can then restart the external broker, which will cause failover, which
+ * will be complete when the following appears.
+ *
+ * Failover Complete
+ *
+ * A second message send/receive cycle is then done to validate that the
+ * connection/session are still working.
+ *
+ */
+public class QuickAcking extends QpidTestCase implements ConnectionListener
+{
+ protected AMQConnection _connection;
+ protected Queue _queue;
+ protected Session _session;
+ protected MessageConsumer _consumer;
+ private CountDownLatch _failedOver;
+ private static final String INDEX = "INDEX";
+ private int _count = 0;
+
+ public void setUp()
+ {
+ // Prevent broker startup. Broker must be run manually.
+ }
+
+ public void test() throws Exception
+ {
+ _failedOver = new CountDownLatch(1);
+
+ _connection = new AMQConnection("amqp://guest:guest@client/test?brokerlist='localhost?retries='20'&connectdelay='2000''");
+
+ _session = _connection.createSession(true, Session.SESSION_TRANSACTED);
+ _queue = _session.createQueue("QAtest");
+ _consumer = _session.createConsumer(_queue);
+ _connection.setConnectionListener(this);
+ _connection.start();
+
+ sendAndReceive();
+
+ _failedOver.await();
+
+ sendAndReceive();
+
+ }
+
+ private void sendAndReceive()
+ throws Exception
+ {
+ sendMessage();
+
+ Message message = _consumer.receive();
+
+ if (message.getIntProperty(INDEX) != _count)
+ {
+ throw new Exception("Incorrect message recieved:" + _count);
+ }
+
+ if (_session.getTransacted())
+ {
+ _session.commit();
+ }
+ System.err.println("Recevied Message:" + _count);
+ }
+
+ private void sendMessage() throws JMSException
+ {
+ MessageProducer producer = _session.createProducer(_queue);
+ Message message = _session.createMessage();
+ _count++;
+ message.setIntProperty(INDEX, _count);
+
+ producer.send(message);
+ if (_session.getTransacted())
+ {
+ _session.commit();
+ }
+ producer.close();
+
+ System.err.println("Sent Message:" + _count);
+ }
+
+ public void bytesSent(long count)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void bytesReceived(long count)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean preFailover(boolean redirect)
+ {
+ return true;
+ }
+
+ public boolean preResubscribe()
+ {
+ return true;
+ }
+
+ public void failoverComplete()
+ {
+ System.err.println("Failover Complete");
+ _failedOver.countDown();
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java
index 7434fcbb30..4a123cb1dc 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java
@@ -23,8 +23,7 @@ import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.jms.Session;
-import org.apache.qpid.test.utils.QpidTestCase;
-
+import org.apache.qpid.test.utils.FailoverBaseCase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,16 +34,21 @@ import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.TextMessage;
-
import java.util.concurrent.atomic.AtomicInteger;
-public class RecoverTest extends QpidTestCase
+public class RecoverTest extends FailoverBaseCase
{
- private static final Logger _logger = LoggerFactory.getLogger(RecoverTest.class);
+ static final Logger _logger = LoggerFactory.getLogger(RecoverTest.class);
private Exception _error;
private AtomicInteger count;
+ protected AMQConnection _connection;
+ protected Session _consumerSession;
+ protected MessageConsumer _consumer;
+ static final int SENT_COUNT = 4;
+
+ @Override
protected void setUp() throws Exception
{
super.setUp();
@@ -52,134 +56,110 @@ public class RecoverTest extends QpidTestCase
count = new AtomicInteger();
}
- protected void tearDown() throws Exception
+ protected void initTest() throws Exception
{
- super.tearDown();
- count = null;
- }
+ _connection = (AMQConnection) getConnection("guest", "guest");
- public void testRecoverResendsMsgs() throws Exception
- {
- AMQConnection con = (AMQConnection) getConnection("guest", "guest");
-
- Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- Queue queue =
- new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("someQ"),
- new AMQShortString("someQ"), false, true);
- MessageConsumer consumer = consumerSession.createConsumer(queue);
- // force synch to ensure the consumer has resulted in a bound queue
- // ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
- // This is the default now
+ _consumerSession = _connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ Queue queue = _consumerSession.createQueue(getTestQueueName());
- AMQConnection con2 = (AMQConnection) getConnection("guest", "guest");
- Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- MessageProducer producer = producerSession.createProducer(queue);
+ _consumer = _consumerSession.createConsumer(queue);
_logger.info("Sending four messages");
- producer.send(producerSession.createTextMessage("msg1"));
- producer.send(producerSession.createTextMessage("msg2"));
- producer.send(producerSession.createTextMessage("msg3"));
- producer.send(producerSession.createTextMessage("msg4"));
-
- con2.close();
-
+ sendMessage(_connection.createSession(false, Session.AUTO_ACKNOWLEDGE), queue, SENT_COUNT);
_logger.info("Starting connection");
- con.start();
- TextMessage tm = (TextMessage) consumer.receive();
- tm.acknowledge();
- _logger.info("Received and acknowledged first message");
- consumer.receive();
- consumer.receive();
- consumer.receive();
- _logger.info("Received all four messages. Calling recover with three outstanding messages");
- // no ack for last three messages so when I call recover I expect to get three messages back
- consumerSession.recover();
- tm = (TextMessage) consumer.receive(3000);
- assertEquals("msg2", tm.getText());
+ _connection.start();
+ }
- tm = (TextMessage) consumer.receive(3000);
- assertEquals("msg3", tm.getText());
+ protected Message validateNextMessages(int nextCount, int startIndex) throws JMSException
+ {
+ Message message = null;
+ for (int index = 0; index < nextCount; index++)
+ {
+ message = _consumer.receive(3000);
+ assertEquals(startIndex + index, message.getIntProperty(INDEX));
+ }
+ return message;
+ }
- tm = (TextMessage) consumer.receive(3000);
- assertEquals("msg4", tm.getText());
+ protected void validateRemainingMessages(int remaining) throws JMSException
+ {
+ int index = SENT_COUNT - remaining;
- _logger.info("Received redelivery of three messages. Acknowledging last message");
- tm.acknowledge();
+ Message message = null;
+ while (index != SENT_COUNT)
+ {
+ message = _consumer.receive(3000);
+ assertEquals(index++, message.getIntProperty(INDEX));
+ }
+
+ if (message != null)
+ {
+ _logger.info("Received redelivery of three messages. Acknowledging last message");
+ message.acknowledge();
+ }
_logger.info("Calling acknowledge with no outstanding messages");
// all acked so no messages to be delivered
- consumerSession.recover();
+ _consumerSession.recover();
- tm = (TextMessage) consumer.receiveNoWait();
- assertNull(tm);
+ message = _consumer.receiveNoWait();
+ assertNull(message);
_logger.info("No messages redelivered as is expected");
-
- con.close();
}
- public void testRecoverResendsMsgsAckOnEarlier() throws Exception
+ public void testRecoverResendsMsgs() throws Exception
{
- AMQConnection con = (AMQConnection) getConnection("guest", "guest");
+ initTest();
- Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- Queue queue =
- new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("someQ"),
- new AMQShortString("someQ"), false, true);
- MessageConsumer consumer = consumerSession.createConsumer(queue);
- // force synch to ensure the consumer has resulted in a bound queue
- // ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
- // This is the default now
+ Message message = validateNextMessages(1, 0);
+ message.acknowledge();
+ _logger.info("Received and acknowledged first message");
- AMQConnection con2 = (AMQConnection) getConnection("guest", "guest");
- Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- MessageProducer producer = producerSession.createProducer(queue);
+ _consumer.receive();
+ _consumer.receive();
+ _consumer.receive();
+ _logger.info("Received all four messages. Calling recover with three outstanding messages");
+ // no ack for last three messages so when I call recover I expect to get three messages back
- _logger.info("Sending four messages");
- producer.send(producerSession.createTextMessage("msg1"));
- producer.send(producerSession.createTextMessage("msg2"));
- producer.send(producerSession.createTextMessage("msg3"));
- producer.send(producerSession.createTextMessage("msg4"));
+ _consumerSession.recover();
- con2.close();
+ validateRemainingMessages(3);
+ }
- _logger.info("Starting connection");
- con.start();
- TextMessage tm = (TextMessage) consumer.receive();
- consumer.receive();
- tm.acknowledge();
+ public void testRecoverResendsMsgsAckOnEarlier() throws Exception
+ {
+ initTest();
+
+ Message message = validateNextMessages(2, 0);
+ message.acknowledge();
_logger.info("Received 2 messages, acknowledge() first message, should acknowledge both");
- consumer.receive();
- consumer.receive();
+ _consumer.receive();
+ _consumer.receive();
_logger.info("Received all four messages. Calling recover with two outstanding messages");
// no ack for last three messages so when I call recover I expect to get three messages back
- consumerSession.recover();
- TextMessage tm3 = (TextMessage) consumer.receive(3000);
- assertEquals("msg3", tm3.getText());
+ _consumerSession.recover();
+
+ Message message2 = _consumer.receive(3000);
+ assertEquals(2, message2.getIntProperty(INDEX));
- TextMessage tm4 = (TextMessage) consumer.receive(3000);
- assertEquals("msg4", tm4.getText());
+ Message message3 = _consumer.receive(3000);
+ assertEquals(3, message3.getIntProperty(INDEX));
_logger.info("Received redelivery of two messages. calling acknolwedgeThis() first of those message");
- ((org.apache.qpid.jms.Message) tm3).acknowledgeThis();
+ ((org.apache.qpid.jms.Message) message2).acknowledgeThis();
_logger.info("Calling recover");
// all acked so no messages to be delivered
- consumerSession.recover();
+ _consumerSession.recover();
- tm4 = (TextMessage) consumer.receive(3000);
- assertEquals("msg4", tm4.getText());
- ((org.apache.qpid.jms.Message) tm4).acknowledgeThis();
+ message3 = _consumer.receive(3000);
+ assertEquals(3, message3.getIntProperty(INDEX));
+ ((org.apache.qpid.jms.Message) message3).acknowledgeThis();
- _logger.info("Calling recover");
// all acked so no messages to be delivered
- consumerSession.recover();
-
- tm = (TextMessage) consumer.receiveNoWait();
- assertNull(tm);
- _logger.info("No messages redelivered as is expected");
-
- con.close();
+ validateRemainingMessages(0);
}
public void testAcknowledgePerConsumer() throws Exception
@@ -188,11 +168,11 @@ public class RecoverTest extends QpidTestCase
Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue =
- new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q1"), new AMQShortString("Q1"),
- false, true);
+ new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q1"), new AMQShortString("Q1"),
+ false, true);
Queue queue2 =
- new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q2"), new AMQShortString("Q2"),
- false, true);
+ new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q2"), new AMQShortString("Q2"),
+ false, true);
MessageConsumer consumer = consumerSession.createConsumer(queue);
MessageConsumer consumer2 = consumerSession.createConsumer(queue2);
@@ -231,8 +211,8 @@ public class RecoverTest extends QpidTestCase
final Session consumerSession = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue =
- new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q3"), new AMQShortString("Q3"),
- false, true);
+ new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q3"), new AMQShortString("Q3"),
+ false, true);
MessageConsumer consumer = consumerSession.createConsumer(queue);
MessageProducer producer = consumerSession.createProducer(queue);
producer.send(consumerSession.createTextMessage("hello"));
@@ -240,50 +220,50 @@ public class RecoverTest extends QpidTestCase
final Object lock = new Object();
consumer.setMessageListener(new MessageListener()
- {
+ {
- public void onMessage(Message message)
+ public void onMessage(Message message)
+ {
+ try
{
- try
+ count.incrementAndGet();
+ if (count.get() == 1)
{
- count.incrementAndGet();
- if (count.get() == 1)
+ if (message.getJMSRedelivered())
{
- if (message.getJMSRedelivered())
- {
- setError(
+ setError(
new Exception("Message marked as redilvered on what should be first delivery attempt"));
- }
-
- consumerSession.recover();
}
- else if (count.get() == 2)
+
+ consumerSession.recover();
+ }
+ else if (count.get() == 2)
+ {
+ if (!message.getJMSRedelivered())
{
- if (!message.getJMSRedelivered())
- {
- setError(
+ setError(
new Exception(
- "Message not marked as redilvered on what should be second delivery attempt"));
- }
- }
- else
- {
- System.err.println(message);
- fail("Message delivered too many times!: " + count);
+ "Message not marked as redilvered on what should be second delivery attempt"));
}
}
- catch (JMSException e)
+ else
{
- _logger.error("Error recovering session: " + e, e);
- setError(e);
+ System.err.println(message);
+ fail("Message delivered too many times!: " + count);
}
+ }
+ catch (JMSException e)
+ {
+ _logger.error("Error recovering session: " + e, e);
+ setError(e);
+ }
- synchronized (lock)
- {
- lock.notify();
- }
+ synchronized (lock)
+ {
+ lock.notify();
}
- });
+ }
+ });
con.start();
@@ -323,9 +303,4 @@ public class RecoverTest extends QpidTestCase
{
_error = e;
}
-
- public static junit.framework.Test suite()
- {
- return new junit.framework.TestSuite(RecoverTest.class);
- }
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/publish/DirtyTrasactedPubilshTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/publish/DirtyTrasactedPubilshTest.java
new file mode 100644
index 0000000000..248042d2c4
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/publish/DirtyTrasactedPubilshTest.java
@@ -0,0 +1,403 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.publish;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.test.utils.FailoverBaseCase;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TransactionRolledBackException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * QPID-1816 : Whilst testing Acknoledgement after failover this completes testing
+ * of the client after failover. When we have a dirty session we should receive
+ * an error if we attempt to publish. This test ensures that both in the synchronous
+ * and asynchronous message delivery paths we receive the expected exceptions at
+ * the expected time.
+ */
+public class DirtyTrasactedPubilshTest extends FailoverBaseCase implements ConnectionListener
+{
+ protected CountDownLatch _failoverCompleted = new CountDownLatch(1);
+
+ protected int NUM_MESSAGES;
+ protected Connection _connection;
+ protected Queue _queue;
+ protected Session _consumerSession;
+ protected MessageConsumer _consumer;
+ protected MessageProducer _producer;
+
+ private static final String MSG = "MSG";
+ private static final String SEND_FROM_ON_MESSAGE_TEXT = "sendFromOnMessage";
+ protected CountDownLatch _receviedAll;
+ protected AtomicReference<Exception> _causeOfFailure = new AtomicReference<Exception>(null);
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ NUM_MESSAGES = 10;
+
+ _queue = getTestQueue();
+
+ //Create Producer put some messages on the queue
+ _connection = getConnection();
+ }
+
+ /**
+ * Initialise the test variables
+ * @param transacted is this a transacted test
+ * @param mode if not trasacted then what ack mode to use
+ * @throws Exception if there is a setup issue.
+ */
+ protected void init(boolean transacted, int mode) throws Exception
+ {
+ _consumerSession = _connection.createSession(transacted, mode);
+ _consumer = _consumerSession.createConsumer(_queue);
+ _producer = _consumerSession.createProducer(_queue);
+
+ // These should all end up being prefetched by session
+ sendMessage(_consumerSession, _queue, 1);
+
+ assertEquals("Wrong number of messages on queue", 1,
+ ((AMQSession) _consumerSession).getQueueDepth((AMQDestination) _queue));
+ }
+
+ /**
+ * If a transacted session has failed over whilst it has uncommitted sent
+ * data then we need to throw a TransactedRolledbackException on commit()
+ *
+ * The alternative would be to maintain a replay buffer so that the message
+ * could be resent. This is not currently implemented
+ *
+ * @throws Exception if something goes wrong.
+ */
+ public void testDirtySendingSynchronousTransacted() throws Exception
+ {
+ Session producerSession = _connection.createSession(true, Session.SESSION_TRANSACTED);
+
+ // Ensure we get failover notifications
+ ((AMQConnection) _connection).setConnectionListener(this);
+
+ MessageProducer producer = producerSession.createProducer(_queue);
+
+ // Create and send message 0
+ Message msg = producerSession.createMessage();
+ msg.setIntProperty(INDEX, 0);
+ producer.send(msg);
+
+ // DON'T commit message .. fail connection
+
+ failBroker(getFailingPort());
+
+ // Ensure destination exists for sending
+ producerSession.createConsumer(_queue).close();
+
+ // Send the next message
+ msg.setIntProperty(INDEX, 1);
+ try
+ {
+ producer.send(msg);
+ fail("Should fail with Qpid as we provide early warning of the dirty session via a JMSException.");
+ }
+ catch (JMSException jmse)
+ {
+ assertEquals("Early warning of dirty session not correct",
+ "Failover has occurred and session is dirty so unable to send.", jmse.getMessage());
+ }
+
+ // Ignore that the session is dirty and attempt to commit to validate the
+ // exception is thrown. AND that the above failure notification did NOT
+ // clean up the session.
+
+ try
+ {
+ producerSession.commit();
+ fail("Session is dirty we should get an TransactionRolledBackException");
+ }
+ catch (TransactionRolledBackException trbe)
+ {
+ // Normal path.
+ }
+
+ // Resending of messages should now work ok as the commit was forcilbly rolledback
+ msg.setIntProperty(INDEX, 0);
+ producer.send(msg);
+ msg.setIntProperty(INDEX, 1);
+ producer.send(msg);
+
+ producerSession.commit();
+
+ assertEquals("Wrong number of messages on queue", 2,
+ ((AMQSession) producerSession).getQueueDepth((AMQDestination) _queue));
+ }
+
+ /**
+ * If a transacted session has failed over whilst it has uncommitted sent
+ * data then we need to throw a TransactedRolledbackException on commit()
+ *
+ * The alternative would be to maintain a replay buffer so that the message
+ * could be resent. This is not currently implemented
+ *
+ * @throws Exception if something goes wrong.
+ */
+ public void testDirtySendingOnMessageTransacted() throws Exception
+ {
+ NUM_MESSAGES = 1;
+ _receviedAll = new CountDownLatch(NUM_MESSAGES);
+ ((AMQConnection) _connection).setConnectionListener(this);
+
+ init(true, Session.SESSION_TRANSACTED);
+
+ _consumer.setMessageListener(new MessageListener()
+ {
+
+ public void onMessage(Message message)
+ {
+ try
+ {
+ // Create and send message 0
+ Message msg = _consumerSession.createMessage();
+ msg.setIntProperty(INDEX, 0);
+ _producer.send(msg);
+
+ // DON'T commit message .. fail connection
+
+ failBroker(getFailingPort());
+
+ // rep
+ repopulateBroker();
+
+ // Destination will exist as this failBroker will populate
+ // the queue with 1 message
+
+ // Send the next message
+ msg.setIntProperty(INDEX, 1);
+ try
+ {
+ _producer.send(msg);
+ fail("Should fail with Qpid as we provide early warning of the dirty session via a JMSException.");
+ }
+ catch (JMSException jmse)
+ {
+ assertEquals("Early warning of dirty session not correct",
+ "Failover has occurred and session is dirty so unable to send.", jmse.getMessage());
+ }
+
+ // Ignore that the session is dirty and attempt to commit to validate the
+ // exception is thrown. AND that the above failure notification did NOT
+ // clean up the session.
+
+ try
+ {
+ _consumerSession.commit();
+ fail("Session is dirty we should get an TransactionRolledBackException");
+ }
+ catch (TransactionRolledBackException trbe)
+ {
+ // Normal path.
+ }
+
+ // Resend messages
+ msg.setIntProperty(INDEX, 0);
+ msg.setStringProperty(MSG, SEND_FROM_ON_MESSAGE_TEXT);
+ _producer.send(msg);
+ msg.setIntProperty(INDEX, 1);
+ msg.setStringProperty(MSG, SEND_FROM_ON_MESSAGE_TEXT);
+ _producer.send(msg);
+
+ _consumerSession.commit();
+
+ // Stop this consumer .. can't do _consumer.stop == DEADLOCK
+ // this doesn't seem to stop dispatcher running
+ _connection.stop();
+
+ // Signal that the onMessage send part of test is complete
+ // main thread can validate that messages are correct
+ _receviedAll.countDown();
+
+ }
+ catch (Exception e)
+ {
+ fail(e);
+ }
+
+ }
+
+ });
+
+ _connection.start();
+
+ if (!_receviedAll.await(10000L, TimeUnit.MILLISECONDS))
+ {
+ // Check to see if we ended due to an exception in the onMessage handler
+ Exception cause = _causeOfFailure.get();
+ if (cause != null)
+ {
+ cause.printStackTrace();
+ fail(cause.getMessage());
+ }
+ else
+ {
+ fail("All messages not received:" + _receviedAll.getCount() + "/" + NUM_MESSAGES);
+ }
+ }
+
+ // Check to see if we ended due to an exception in the onMessage handler
+ Exception cause = _causeOfFailure.get();
+ if (cause != null)
+ {
+ cause.printStackTrace();
+ fail(cause.getMessage());
+ }
+
+ _consumer.close();
+ _consumerSession.close();
+
+ _consumerSession = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ _connection.start();
+
+ // Validate that we could send the messages as expected.
+ assertEquals("Wrong number of messages on queue", 3,
+ ((AMQSession) _consumerSession).getQueueDepth((AMQDestination) _queue));
+
+ MessageConsumer consumer = _consumerSession.createConsumer(_queue);
+
+ //Validate the message sent to setup the failed over broker.
+ Message message = consumer.receive(1000);
+ assertNotNull("Message " + 0 + " not received.", message);
+ assertEquals("Incorrect message received", 0, message.getIntProperty(INDEX));
+
+ // Validate the two messages sent from within the onMessage
+ for (int index = 0; index <= 1; index++)
+ {
+ message = consumer.receive(1000);
+ assertNotNull("Message " + index + " not received.", message);
+ assertEquals("Incorrect message received", index, message.getIntProperty(INDEX));
+ assertEquals("Incorrect message text for message:" + index, SEND_FROM_ON_MESSAGE_TEXT, message.getStringProperty(MSG));
+ }
+
+ assertNull("Extra message received.", consumer.receiveNoWait());
+
+ _consumerSession.close();
+
+ assertEquals("Wrong number of messages on queue", 0,
+ ((AMQSession) getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE)).getQueueDepth((AMQDestination) _queue));
+ }
+
+ private void repopulateBroker() throws Exception
+ {
+ // Repopulate this new broker so we can test what happends after failover
+
+ //Get the connection to the first (main port) broker.
+ Connection connection = getConnection();
+ // Use a transaction to send messages so we can be sure they arrive.
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ // ensure destination is created.
+ session.createConsumer(_queue).close();
+
+ sendMessage(session, _queue, NUM_MESSAGES);
+
+ assertEquals("Wrong number of messages on queue", NUM_MESSAGES,
+ ((AMQSession) session).getQueueDepth((AMQDestination) _queue));
+
+ connection.close();
+ }
+
+ // AMQConnectionListener Interface.. used so we can validate that we
+ // actually failed over.
+
+ public void bytesSent(long count)
+ {
+ }
+
+ public void bytesReceived(long count)
+ {
+ }
+
+ public boolean preFailover(boolean redirect)
+ {
+ //Allow failover
+ return true;
+ }
+
+ public boolean preResubscribe()
+ {
+ //Allow failover
+ return true;
+ }
+
+ public void failoverComplete()
+ {
+ _failoverCompleted.countDown();
+ }
+
+ /**
+ * Override so we can block until failover has completd
+ *
+ * @param port int the port of the broker to fail.
+ */
+ @Override
+ public void failBroker(int port)
+ {
+ super.failBroker(port);
+
+ try
+ {
+ if (!_failoverCompleted.await(DEFAULT_FAILOVER_TIME, TimeUnit.MILLISECONDS))
+ {
+ fail("Failover did not occur in specified time:" + DEFAULT_FAILOVER_TIME);
+ }
+ }
+ catch (InterruptedException e)
+ {
+ fail("Failover was interuppted");
+ }
+ }
+
+ /**
+ * Pass the given exception back to the waiting thread to fail the test run.
+ *
+ * @param e The exception that is causing the test to fail.
+ */
+ protected void fail(Exception e)
+ {
+ _causeOfFailure.set(e);
+ // End the test.
+ while (_receviedAll.getCount() != 0)
+ {
+ _receviedAll.countDown();
+ }
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java
index deb021fd96..fc82ff1778 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java
@@ -20,42 +20,45 @@
*/
package org.apache.qpid.test.utils;
-import javax.jms.Connection;
+import org.apache.qpid.util.FileUtils;
+
+import javax.naming.NamingException;
import javax.jms.JMSException;
import javax.naming.NamingException;
+import org.apache.qpid.client.AMQConnectionFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
public class FailoverBaseCase extends QpidTestCase
{
+ protected static final Logger _logger = LoggerFactory.getLogger(FailoverBaseCase.class);
public static int FAILING_VM_PORT = 2;
public static int FAILING_PORT = Integer.parseInt(System.getProperty("test.port.alt"));
+ public static final long DEFAULT_FAILOVER_TIME = 10000L;
protected int failingPort;
-
- private boolean failedOver = false;
- public FailoverBaseCase()
+ protected int getFailingPort()
{
if (_broker.equals(VM))
{
- failingPort = FAILING_VM_PORT;
+ return FAILING_VM_PORT;
}
else
{
- failingPort = FAILING_PORT;
+ return FAILING_PORT;
}
}
-
- protected int getFailingPort()
- {
- return failingPort;
- }
protected void setUp() throws java.lang.Exception
{
super.setUp();
- setSystemProperty("QPID_WORK", System.getProperty("QPID_WORK")+"/"+getFailingPort());
- startBroker(failingPort);
+ // Set QPID_WORK to $QPID_WORK/<getFailingPort()>
+ // or /tmp/<getFailingPort()> if QPID_WORK not set.
+ setSystemProperty("QPID_WORK", System.getProperty("QPID_WORK", System.getProperty("java.io.tmpdir")) + "/" + getFailingPort());
+ startBroker(getFailingPort());
}
/**
@@ -64,41 +67,52 @@ public class FailoverBaseCase extends QpidTestCase
* @return a connection
* @throws Exception
*/
- public Connection getConnection() throws JMSException, NamingException
+ @Override
+ public AMQConnectionFactory getConnectionFactory() throws NamingException
{
- Connection conn =
- (Boolean.getBoolean("profile.use_ssl"))?
- getConnectionFactory("failover.ssl").createConnection("guest", "guest"):
- getConnectionFactory("failover").createConnection("guest", "guest");
- _connections.add(conn);
- return conn;
+ _logger.info("get ConnectionFactory");
+ if (_connectionFactory == null)
+ {
+ if (Boolean.getBoolean("profile.use_ssl"))
+ {
+ _connectionFactory = getConnectionFactory("failover.ssl");
+ }
+ else
+ {
+ _connectionFactory = getConnectionFactory("failover");
+ }
+ }
+ return _connectionFactory;
}
+
public void tearDown() throws Exception
{
- stopBroker(_broker.equals(VM)?FAILING_PORT:FAILING_PORT);
- super.tearDown();
+ try
+ {
+ super.tearDown();
+ }
+ finally
+ {
+ // Ensure we shutdown any secondary brokers, even if we are unable
+ // to cleanly tearDown the QTC.
+ stopBroker(getFailingPort());
+ FileUtils.deleteDirectory(System.getProperty("QPID_WORK", System.getProperty("java.io.tmpdir")) + "/" + getFailingPort());
+ }
}
- /**
- * Only used of VM borker.
- */
- public void failBroker()
+ public void failBroker(int port)
{
- failedOver = true;
try
{
- stopBroker(getFailingPort());
+ stopBroker(port);
}
catch (Exception e)
{
throw new RuntimeException(e);
}
}
-
- protected void setFailingPort(int p)
- {
- failingPort = p;
- }
+
+
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java
index 2f40d3a62a..6c1b1c7b8d 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java
@@ -22,8 +22,10 @@ import junit.framework.TestResult;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.XMLConfiguration;
import org.apache.qpid.AMQException;
+import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.ConnectionURL;
@@ -81,6 +83,8 @@ public class QpidTestCase extends TestCase
private XMLConfiguration _testConfiguration = new XMLConfiguration();
+ protected static final String INDEX = "index";
+
/**
* Some tests are excluded when the property test.excludes is set to true.
* An exclusion list is either a file (prop test.excludesfile) which contains one test name
@@ -183,7 +187,7 @@ public class QpidTestCase extends TestCase
private Map<Integer, Process> _brokers = new HashMap<Integer, Process>();
private InitialContext _initialContext;
- private AMQConnectionFactory _connectionFactory;
+ protected AMQConnectionFactory _connectionFactory;
private String _testName;
@@ -193,8 +197,6 @@ public class QpidTestCase extends TestCase
public static final String TOPIC = "topic";
/** Map to hold test defined environment properties */
private Map<String, String> _env;
- protected static final String INDEX = "index";
- ;
public QpidTestCase(String name)
{
@@ -929,7 +931,7 @@ public class QpidTestCase extends TestCase
{
if (Boolean.getBoolean("profile.use_ssl"))
{
- _connectionFactory = getConnectionFactory("ssl");
+ _connectionFactory = getConnectionFactory("default.ssl");
}
else
{
@@ -1019,16 +1021,32 @@ public class QpidTestCase extends TestCase
return getClass().getSimpleName() + "-" + getName();
}
+ /**
+ * Return a Queue specific for this test.
+ * Uses getTestQueueName() as the name of the queue
+ * @return
+ */
+ public Queue getTestQueue()
+ {
+ return new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, getTestQueueName());
+ }
+
+
protected void tearDown() throws java.lang.Exception
{
- // close all the connections used by this test.
- for (Connection c : _connections)
+ try
{
- c.close();
+ // close all the connections used by this test.
+ for (Connection c : _connections)
+ {
+ c.close();
+ }
+ }
+ finally{
+ // Ensure any problems with close does not interfer with property resets
+ revertSystemProperties();
+ revertLoggingLevels();
}
-
- revertSystemProperties();
- revertLoggingLevels();
}
/**
@@ -1065,17 +1083,23 @@ public class QpidTestCase extends TestCase
public List<Message> sendMessage(Session session, Destination destination,
int count) throws Exception
{
- return sendMessage(session, destination, count, 0);
+ return sendMessage(session, destination, count, 0, 0);
}
public List<Message> sendMessage(Session session, Destination destination,
int count, int batchSize) throws Exception
{
+ return sendMessage(session, destination, count, 0, batchSize);
+ }
+
+ public List<Message> sendMessage(Session session, Destination destination,
+ int count, int offset, int batchSize) throws Exception
+ {
List<Message> messages = new ArrayList<Message>(count);
MessageProducer producer = session.createProducer(destination);
- for (int i = 0; i < count; i++)
+ for (int i = offset; i < (count + offset); i++)
{
Message next = createNextMessage(session, i);
@@ -1094,8 +1118,11 @@ public class QpidTestCase extends TestCase
}
// Ensure we commit the last messages
- if (session.getTransacted() && (batchSize > 0) &&
- (count / batchSize != 0))
+ // Commit the session if we are transacted and
+ // we have no batchSize or
+ // our count is not divible by batchSize.
+ if (session.getTransacted() &&
+ ( batchSize == 0 || count % batchSize != 0))
{
session.commit();
}
@@ -1106,7 +1133,6 @@ public class QpidTestCase extends TestCase
public Message createNextMessage(Session session, int msgCount) throws JMSException
{
Message message = session.createMessage();
-
message.setIntProperty(INDEX, msgCount);
return message;
diff --git a/qpid/java/test-profiles/Excludes b/qpid/java/test-profiles/Excludes
index a72d3bc86c..d14d467b89 100644
--- a/qpid/java/test-profiles/Excludes
+++ b/qpid/java/test-profiles/Excludes
@@ -19,3 +19,14 @@ org.apache.qpid.server.logging.DerbyMessageStoreLoggingTest#*
// QPID-2081 :The configuration changes are now highlighting the close race condition
org.apache.qpid.server.security.acl.SimpleACLTest#*
+
+// QPID-1816 : Client Ack has not been addressed
+org.apache.qpid.test.unit.ack.AcknowledgeAfterFailoverOnMessageTest#testDirtyClientAck
+org.apache.qpid.test.unit.ack.AcknowledgeAfterFailoverOnMessageTest#testClientAck
+org.apache.qpid.test.unit.ack.AcknowledgeAfterFailoverTest#testDirtyClientAck
+org.apache.qpid.test.unit.ack.AcknowledgeAfterFailoverTest#testClientAck
+
+
+// QPID-143 : Failover can occur between receive and ack but we don't stop the ack.
+org.apache.qpid.test.unit.ack.AcknowledgeAfterFailoverOnMessageTest#testAutoAck
+org.apache.qpid.test.unit.ack.AcknowledgeAfterFailoverOnMessageTest#testDupsOk
diff --git a/qpid/java/test-profiles/test-provider.properties b/qpid/java/test-profiles/test-provider.properties
index a349b0fcbf..70a2672263 100644
--- a/qpid/java/test-profiles/test-provider.properties
+++ b/qpid/java/test-profiles/test-provider.properties
@@ -29,14 +29,13 @@ test.port.alt.ssl=25671
connectionfactory.default = amqp://username:password@clientid/test?brokerlist='tcp://localhost:${test.port}'
+connectionfactory.default.ssl = amqp://username:password@clientid/test?brokerlist='tcp://localhost:${test.port.ssl}?ssl='true''
connectionfactory.default.vm = amqp://username:password@clientid/test?brokerlist='vm://:1'
-connectionfactory.ssl = amqp://username:password@clientid/test?brokerlist='tcp://localhost:${test.port.ssl}?ssl='true''
connectionfactory.failover = amqp://username:password@clientid/test?brokerlist='tcp://localhost:${test.port.alt};tcp://localhost:${test.port}'&sync_ack='true'&sync_publish='all'&failover='roundrobin?cyclecount='20''
-
connectionfactory.failover.ssl = amqp://username:password@clientid/test?brokerlist='tcp://localhost:${test.port.alt.ssl}?ssl='true';tcp://localhost:${test.port.ssl}?ssl='true''&sync_ack='true'&sync_publish='all'&failover='roundrobin?cyclecount='20''
-
connectionfactory.failover.vm = amqp://username:password@clientid/test?brokerlist='vm://:2;vm://:1'
+
connectionfactory.connection1 = amqp://username:password@clientid/test?brokerlist='tcp://localhost:${test.port}'
connectionfactory.connection2 = amqp://username:password@clientid/test?brokerlist='tcp://localhost:${test.port.alt}'
connectionfactory.connection1.vm = amqp://username:password@clientid/test?brokerlist='vm://:1'