diff options
Diffstat (limited to 'qpid/java/systests/src/main/java/org/apache/qpid/test/testcases/TTLTest.java')
-rw-r--r-- | qpid/java/systests/src/main/java/org/apache/qpid/test/testcases/TTLTest.java | 154 |
1 files changed, 154 insertions, 0 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/testcases/TTLTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/testcases/TTLTest.java new file mode 100644 index 0000000000..d4bab657d7 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/testcases/TTLTest.java @@ -0,0 +1,154 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.testcases; + +import org.apache.qpid.test.framework.Circuit; +import org.apache.qpid.test.framework.FrameworkBaseCase; +import static org.apache.qpid.test.framework.MessagingTestConfigProperties.ACK_MODE_PROPNAME; +import static org.apache.qpid.test.framework.MessagingTestConfigProperties.PUBSUB_PROPNAME; +import org.apache.qpid.test.framework.TestUtils; +import org.apache.qpid.test.framework.localcircuit.LocalCircuitImpl; +import org.apache.qpid.test.framework.sequencers.CircuitFactory; + +import javax.jms.*; + +import java.util.LinkedList; +import java.util.List; +import java.util.Random; + +/** + * TTLTest checks that time-to-live is applied to messages. The test sends messages with a variety of TTL stamps on them + * then after a pause attempts to receive those messages. Only messages with a large enough TTL to have survived the pause + * should be receiveable. This test case also applies an additional assertion against the broker, that the message store + * is empty at the end of the test. + * + * <p/>This test is designed to run over local circuits only, as it must control a timed pause between sending and receiving + * messages to that TTL can be applied to purge some of the messages. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> + * </table> + * + * @todo Use an interface or other method to mark this test as local only. + * + * @todo Implement the message store assertion for in-vm broker. Could also be done for external broker, for example + * by using diagnostic exchange. + * + * @todo Implement and add a queue depth assertion too. This might already be in another test to copy from. + * + * @todo Create variations on test theme, for different ack mode and tx and message sizes etc. + * + * @todo Add an allowable margin of error to the test, as ttl may not be precise. + */ +public class TTLTest extends FrameworkBaseCase +{ + /** + * Creates a new test case with the specified name. + * + * @param name The test case name. + */ + public TTLTest(String name) + { + super(name); + } + + /** + * Checks that all messages sent with a TTL shorter than a pause between sending them and attempting to receive them + * will fail to arrive. Once all messages have been purged by TTL or received, check that they no longer exist on + * the broker. + * + * @throws javax.jms.JMSException Allowed to fall through and fail test. + */ + public void testTTLP2P() throws Exception + { + String errorMessages = ""; + Random r = new Random(); + + // Used to accumulate correctly received messages in. + List<Message> receivedMessages = new LinkedList<Message>(); + + // Set up the test properties to match the test case requirements. + getTestProps().setProperty(ACK_MODE_PROPNAME, Session.AUTO_ACKNOWLEDGE); + getTestProps().setProperty(PUBSUB_PROPNAME, false); + + // Create the test circuit from the test configuration parameters. + CircuitFactory circuitFactory = getCircuitFactory(); + Circuit testCircuit = circuitFactory.createCircuit(getConnection(), getTestProps()); + + // This test case assumes it is using a local circuit. + LocalCircuitImpl localCircuit = (LocalCircuitImpl) testCircuit; + + Session producerSession = localCircuit.getLocalPublisherCircuitEnd().getSession(); + MessageProducer producer = localCircuit.getLocalPublisherCircuitEnd().getProducer(); + MessageConsumer consumer = localCircuit.getLocalReceiverCircuitEnd().getConsumer(); + + // Send some tests messages, with random TTLs, some shorter and some longer than the pause time. + for (int i = 0; i < 100; i++) + { + Message testMessage = TestUtils.createTestMessageOfSize(producerSession, 10); + + // Set the TTL on the message and record its value in the message headers. + long ttl = 500 + r.nextInt(1500); + producer.setTimeToLive(ttl); + testMessage.setLongProperty("testTTL", ttl); + + producer.send(testMessage); + // producerSession.commit(); + } + + // Inject a pause to allow some messages to be purged by TTL. + TestUtils.pause(1000); + + // Attempt to receive back all of the messages, confirming by the message time stamps and TTLs that only + // those received should have avoided being purged by the TTL. + boolean timedOut = false; + + + Message testMessage = null; + + do + { + testMessage = consumer.receive(1000); + + long ttl = testMessage.getLongProperty("testTTL"); + long timeStamp = testMessage.getJMSTimestamp(); + long now = System.currentTimeMillis(); + + if ((timeStamp + ttl) < now) + { + errorMessages += + "Received message [sent: " + timeStamp + ", ttl: " + ttl + ", received: " + now + + "] which should have been purged by its TTL.\n"; + } + /*else + { + receivedMessages.add(testMessage); + }*/ + } while (!timedOut && testMessage != null); + + // Check that the queue and message store on the broker are empty. + // assertTrue("Message store is not empty.", messageStoreEmpty.apply()); + // assertTrue("Queue is not empty.", queueEmpty.apply()); + + assertTrue(errorMessages, "".equals(errorMessages)); + } +} |