summaryrefslogtreecommitdiff
path: root/qpid/java/systests/src/main/java/org/apache/qpid/test/testcases/TTLTest.java
blob: d4bab657d7ef0134c581f5f66ce5732b569bf8d0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */
package org.apache.qpid.test.testcases;

import org.apache.qpid.test.framework.Circuit;
import org.apache.qpid.test.framework.FrameworkBaseCase;
import static org.apache.qpid.test.framework.MessagingTestConfigProperties.ACK_MODE_PROPNAME;
import static org.apache.qpid.test.framework.MessagingTestConfigProperties.PUBSUB_PROPNAME;
import org.apache.qpid.test.framework.TestUtils;
import org.apache.qpid.test.framework.localcircuit.LocalCircuitImpl;
import org.apache.qpid.test.framework.sequencers.CircuitFactory;

import javax.jms.*;

import java.util.LinkedList;
import java.util.List;
import java.util.Random;

/**
 * TTLTest checks that time-to-live is applied to messages. The test sends messages with a variety of TTL stamps on them
 * then after a pause attempts to receive those messages. Only messages with a large enough TTL to have survived the pause
 * should be receiveable. This test case also applies an additional assertion against the broker, that the message store
 * is empty at the end of the test.
 *
 * <p/>This test is designed to run over local circuits only, as it must control a timed pause between sending and receiving
 * messages to that TTL can be applied to purge some of the messages.
 *
 * <p/><table id="crc"><caption>CRC Card</caption>
 * <tr><th> Responsibilities <th> Collaborations
 * <tr><td>
 * </table>
 *
 * @todo Use an interface or other method to mark this test as local only.
 *
 * @todo Implement the message store assertion for in-vm broker. Could also be done for external broker, for example
 *       by using diagnostic exchange.
 *
 * @todo Implement and add a queue depth assertion too. This might already be in another test to copy from.
 *
 * @todo Create variations on test theme, for different ack mode and tx and message sizes etc.
 *
 * @todo Add an allowable margin of error to the test, as ttl may not be precise.
 */
public class TTLTest extends FrameworkBaseCase
{
    /**
     * Creates a new test case with the specified name.
     *
     * @param name The test case name.
     */
    public TTLTest(String name)
    {
        super(name);
    }

    /**
     * Checks that all messages sent with a TTL shorter than a pause between sending them and attempting to receive them
     * will fail to arrive. Once all messages have been purged by TTL or received, check that they no longer exist on
     * the broker.
     *
     * @throws javax.jms.JMSException Allowed to fall through and fail test.
     */
    public void testTTLP2P() throws Exception
    {
        String errorMessages = "";
        Random r = new Random();

        // Used to accumulate correctly received messages in.
        List<Message> receivedMessages = new LinkedList<Message>();

        // Set up the test properties to match the test case requirements.
        getTestProps().setProperty(ACK_MODE_PROPNAME, Session.AUTO_ACKNOWLEDGE);
        getTestProps().setProperty(PUBSUB_PROPNAME, false);

        // Create the test circuit from the test configuration parameters.
        CircuitFactory circuitFactory = getCircuitFactory();
        Circuit testCircuit = circuitFactory.createCircuit(getConnection(), getTestProps());

        // This test case assumes it is using a local circuit.
        LocalCircuitImpl localCircuit = (LocalCircuitImpl) testCircuit;

        Session producerSession = localCircuit.getLocalPublisherCircuitEnd().getSession();
        MessageProducer producer = localCircuit.getLocalPublisherCircuitEnd().getProducer();
        MessageConsumer consumer = localCircuit.getLocalReceiverCircuitEnd().getConsumer();

        // Send some tests messages, with random TTLs, some shorter and some longer than the pause time.
        for (int i = 0; i < 100; i++)
        {
            Message testMessage = TestUtils.createTestMessageOfSize(producerSession, 10);

            // Set the TTL on the message and record its value in the message headers.
            long ttl = 500 + r.nextInt(1500);
            producer.setTimeToLive(ttl);
            testMessage.setLongProperty("testTTL", ttl);

            producer.send(testMessage);
            // producerSession.commit();
        }

        // Inject a pause to allow some messages to be purged by TTL.
        TestUtils.pause(1000);

        // Attempt to receive back all of the messages, confirming by the message time stamps and TTLs that only
        // those received should have avoided being purged by the TTL.
        boolean timedOut = false;


        Message testMessage = null;

        do
        {
            testMessage = consumer.receive(1000);

            long ttl = testMessage.getLongProperty("testTTL");
            long timeStamp = testMessage.getJMSTimestamp();
            long now = System.currentTimeMillis();

            if ((timeStamp + ttl) < now)
            {
                errorMessages +=
                    "Received message [sent: " + timeStamp + ", ttl: " + ttl + ", received: " + now
                    + "] which should have been purged by its TTL.\n";
            }
            /*else
            {
                receivedMessages.add(testMessage);
            }*/
        } while (!timedOut && testMessage != null);

        // Check that the queue and message store on the broker are empty.
        // assertTrue("Message store is not empty.", messageStoreEmpty.apply());
        // assertTrue("Queue is not empty.", queueEmpty.apply());

        assertTrue(errorMessages, "".equals(errorMessages));
    }
}