summaryrefslogtreecommitdiff
path: root/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java')
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java226
1 files changed, 226 insertions, 0 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java
new file mode 100644
index 0000000000..1b4407f255
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java
@@ -0,0 +1,226 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.ack;
+
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.JMSAMQException;
+import org.apache.qpid.client.failover.FailoverException;
+
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * This test extends the synchronous AcknowledgeTest to use a MessageListener
+ * and receive messages asynchronously.
+ */
+public class AcknowledgeOnMessageTest extends AcknowledgeTest implements MessageListener
+{
+ protected CountDownLatch _receivedAll;
+ protected AtomicReference<Exception> _causeOfFailure = new AtomicReference<Exception>(null);
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ }
+
+ /**
+ * Override the synchronous AcknowledgeTest init to provide the _receivedAll
+ * CountDownLatch init and ensure that we set the MessageListener.
+ * @param transacted
+ * @param mode
+ * @throws Exception
+ */
+ @Override
+ public void init(boolean transacted, int mode) throws Exception
+ {
+ _receivedAll = new CountDownLatch(NUM_MESSAGES);
+
+ super.init(transacted, mode);
+ _consumer.setMessageListener(this);
+ }
+
+ /**
+ * This test overrides the testAcking from the simple recieve() model to all
+ * for asynchronous receiving of messages.
+ *
+ * Again the transaction/ack mode is provided to this main test run
+ *
+ * The init method is called which will setup the listener so that we can
+ * then sit and await using the _receivedAll CountDownLatch. We wait for up
+ * to 10s if no messages have been received in the last 10s then test will
+ * fail.
+ *
+ * If the test fails then it will attempt to retrieve any exception that the
+ * asynchronous delivery thread may have recorded.
+ *
+ * @param transacted
+ * @param mode
+ *
+ * @throws Exception
+ */
+ @Override
+ protected void testAcking(boolean transacted, int mode) throws Exception
+ {
+ init(transacted, mode);
+
+ _connection.start();
+
+ // Set the lastCount to NUM_MESSAGES, this ensures that the compare
+ // against the receviedAll count is accurate.
+ int lastCount = NUM_MESSAGES;
+
+ // Wait for messages to arrive
+ boolean complete = _receivedAll.await(10000L, TimeUnit.MILLISECONDS);
+
+ // If the messasges haven't arrived
+ while (!complete)
+ {
+ // Check how many we have received
+ int currentCount = (int) _receivedAll.getCount();
+
+ // make sure we have received a message in the last cycle.
+ if (lastCount == currentCount)
+ {
+ // If we didn't receive any messages then stop.
+ // Something must have gone wrong.
+ System.err.println("Giving up waiting as we didn't receive anything.");
+ break;
+ }
+ // Remember the currentCount as the lastCount for the next cycle.
+ // so we can exit if things get locked up.
+ lastCount = currentCount;
+
+ // Wait again for messages to arrive.
+ complete = _receivedAll.await(10000L, TimeUnit.MILLISECONDS);
+ }
+
+ // If we failed to receive all the messages then fail the test.
+ if (!complete)
+ {
+ // Check to see if we ended due to an exception in the onMessage handler
+ Exception cause = _causeOfFailure.get();
+ if (cause != null)
+ {
+ cause.printStackTrace();
+ fail(cause.getMessage());
+ }
+ else
+ {
+ _logger.info("AOMT: Check QueueDepth:" + _queue);
+ long onQueue=((AMQSession) getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE)).getQueueDepth((AMQDestination) _queue);
+ fail("All messages not received missing:" + _receivedAll.getCount() + "/" + NUM_MESSAGES+" On Queue:"+onQueue);
+
+ }
+ }
+
+ // Even if we received all the messages.
+ // Check to see if we ended due to an exception in the onMessage handler
+ Exception cause = _causeOfFailure.get();
+ if (cause != null)
+ {
+ cause.printStackTrace();
+ fail(cause.getMessage());
+ }
+
+ try
+ {
+ _consumer.close();
+ }
+ catch (JMSAMQException amqe)
+ {
+ if (amqe.getLinkedException() instanceof FailoverException)
+ {
+ fail("QPID-143 : Auto Ack can acknowledge message from previous session after failver. If failover occurs between deliver and ack.");
+ }
+ // else Rethrow for TestCase to catch.
+ throw amqe;
+ }
+
+ _consumerSession.close();
+
+ _logger.info("AOMT: check number of message at end of test.");
+ assertEquals("Wrong number of messages on queue", 0,
+ ((AMQSession) getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE)).getQueueDepth((AMQDestination) _queue));
+ }
+
+ /**
+ * The MessageListener interface that recieves the message and counts down
+ * the _receivedAll CountDownLatch.
+ *
+ * Again like AcknowledgeTest acknowledgement is actually handled in
+ * doAcknowlegement.
+ *
+ * The message INDEX is validated to ensure the correct message order is
+ * preserved.
+ *
+ * @param message
+ */
+ public void onMessage(Message message)
+ {
+ // Log received Message for debugging
+ _logger.info("RECEIVED MESSAGE:" + message);
+
+ try
+ {
+ int count = NUM_MESSAGES - (int) _receivedAll.getCount();
+
+ assertEquals("Incorrect message received", count, message.getIntProperty(INDEX));
+
+ count++;
+ if (count < NUM_MESSAGES)
+ {
+ //Send the next message
+ _producer.send(createNextMessage(_consumerSession, count));
+ }
+
+ doAcknowlegement(message);
+
+ _receivedAll.countDown();
+ }
+ catch (Exception e)
+ {
+ // This will end the test run by counting down _receivedAll
+ fail(e);
+ }
+ }
+
+ /**
+ * Pass the given exception back to the waiting thread to fail the test run.
+ *
+ * @param e The exception that is causing the test to fail.
+ */
+ protected void fail(Exception e)
+ {
+ //record the failure
+ _causeOfFailure.set(e);
+ // End the test.
+ while (_receivedAll.getCount() != 0)
+ {
+ _receivedAll.countDown();
+ }
+ }
+}