summaryrefslogtreecommitdiff
path: root/qpid/java/systests/src/main/java/org/apache/qpid/test/testcases/TTLTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/systests/src/main/java/org/apache/qpid/test/testcases/TTLTest.java')
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/testcases/TTLTest.java154
1 files changed, 154 insertions, 0 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/testcases/TTLTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/testcases/TTLTest.java
new file mode 100644
index 0000000000..d4bab657d7
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/testcases/TTLTest.java
@@ -0,0 +1,154 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.testcases;
+
+import org.apache.qpid.test.framework.Circuit;
+import org.apache.qpid.test.framework.FrameworkBaseCase;
+import static org.apache.qpid.test.framework.MessagingTestConfigProperties.ACK_MODE_PROPNAME;
+import static org.apache.qpid.test.framework.MessagingTestConfigProperties.PUBSUB_PROPNAME;
+import org.apache.qpid.test.framework.TestUtils;
+import org.apache.qpid.test.framework.localcircuit.LocalCircuitImpl;
+import org.apache.qpid.test.framework.sequencers.CircuitFactory;
+
+import javax.jms.*;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * TTLTest checks that time-to-live is applied to messages. The test sends messages with a variety of TTL stamps on them
+ * then after a pause attempts to receive those messages. Only messages with a large enough TTL to have survived the pause
+ * should be receiveable. This test case also applies an additional assertion against the broker, that the message store
+ * is empty at the end of the test.
+ *
+ * <p/>This test is designed to run over local circuits only, as it must control a timed pause between sending and receiving
+ * messages to that TTL can be applied to purge some of the messages.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td>
+ * </table>
+ *
+ * @todo Use an interface or other method to mark this test as local only.
+ *
+ * @todo Implement the message store assertion for in-vm broker. Could also be done for external broker, for example
+ * by using diagnostic exchange.
+ *
+ * @todo Implement and add a queue depth assertion too. This might already be in another test to copy from.
+ *
+ * @todo Create variations on test theme, for different ack mode and tx and message sizes etc.
+ *
+ * @todo Add an allowable margin of error to the test, as ttl may not be precise.
+ */
+public class TTLTest extends FrameworkBaseCase
+{
+ /**
+ * Creates a new test case with the specified name.
+ *
+ * @param name The test case name.
+ */
+ public TTLTest(String name)
+ {
+ super(name);
+ }
+
+ /**
+ * Checks that all messages sent with a TTL shorter than a pause between sending them and attempting to receive them
+ * will fail to arrive. Once all messages have been purged by TTL or received, check that they no longer exist on
+ * the broker.
+ *
+ * @throws javax.jms.JMSException Allowed to fall through and fail test.
+ */
+ public void testTTLP2P() throws Exception
+ {
+ String errorMessages = "";
+ Random r = new Random();
+
+ // Used to accumulate correctly received messages in.
+ List<Message> receivedMessages = new LinkedList<Message>();
+
+ // Set up the test properties to match the test case requirements.
+ getTestProps().setProperty(ACK_MODE_PROPNAME, Session.AUTO_ACKNOWLEDGE);
+ getTestProps().setProperty(PUBSUB_PROPNAME, false);
+
+ // Create the test circuit from the test configuration parameters.
+ CircuitFactory circuitFactory = getCircuitFactory();
+ Circuit testCircuit = circuitFactory.createCircuit(getConnection(), getTestProps());
+
+ // This test case assumes it is using a local circuit.
+ LocalCircuitImpl localCircuit = (LocalCircuitImpl) testCircuit;
+
+ Session producerSession = localCircuit.getLocalPublisherCircuitEnd().getSession();
+ MessageProducer producer = localCircuit.getLocalPublisherCircuitEnd().getProducer();
+ MessageConsumer consumer = localCircuit.getLocalReceiverCircuitEnd().getConsumer();
+
+ // Send some tests messages, with random TTLs, some shorter and some longer than the pause time.
+ for (int i = 0; i < 100; i++)
+ {
+ Message testMessage = TestUtils.createTestMessageOfSize(producerSession, 10);
+
+ // Set the TTL on the message and record its value in the message headers.
+ long ttl = 500 + r.nextInt(1500);
+ producer.setTimeToLive(ttl);
+ testMessage.setLongProperty("testTTL", ttl);
+
+ producer.send(testMessage);
+ // producerSession.commit();
+ }
+
+ // Inject a pause to allow some messages to be purged by TTL.
+ TestUtils.pause(1000);
+
+ // Attempt to receive back all of the messages, confirming by the message time stamps and TTLs that only
+ // those received should have avoided being purged by the TTL.
+ boolean timedOut = false;
+
+
+ Message testMessage = null;
+
+ do
+ {
+ testMessage = consumer.receive(1000);
+
+ long ttl = testMessage.getLongProperty("testTTL");
+ long timeStamp = testMessage.getJMSTimestamp();
+ long now = System.currentTimeMillis();
+
+ if ((timeStamp + ttl) < now)
+ {
+ errorMessages +=
+ "Received message [sent: " + timeStamp + ", ttl: " + ttl + ", received: " + now
+ + "] which should have been purged by its TTL.\n";
+ }
+ /*else
+ {
+ receivedMessages.add(testMessage);
+ }*/
+ } while (!timedOut && testMessage != null);
+
+ // Check that the queue and message store on the broker are empty.
+ // assertTrue("Message store is not empty.", messageStoreEmpty.apply());
+ // assertTrue("Queue is not empty.", queueEmpty.apply());
+
+ assertTrue(errorMessages, "".equals(errorMessages));
+ }
+}