diff options
Diffstat (limited to 'qpid/java/systests/src/test/java/org/apache/qpid/server/queue/TimeToLiveTest.java')
-rw-r--r-- | qpid/java/systests/src/test/java/org/apache/qpid/server/queue/TimeToLiveTest.java | 397 |
1 files changed, 397 insertions, 0 deletions
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/TimeToLiveTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/TimeToLiveTest.java new file mode 100644 index 0000000000..e606df3f7d --- /dev/null +++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/TimeToLiveTest.java @@ -0,0 +1,397 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.queue; + +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TopicSubscriber; +import javax.naming.NamingException; + +import org.apache.log4j.Logger; +import org.junit.Assert; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +public class TimeToLiveTest extends QpidBrokerTestCase +{ + private static final Logger _logger = Logger.getLogger(TimeToLiveTest.class); + + protected final String QUEUE = "TimeToLiveQueue"; + + private final long TIME_TO_LIVE = 100L; + + private static final int MSG_COUNT = 50; + private static final long SERVER_TTL_TIMEOUT = 60000L; + + public void testPassiveTTLWithPrefetch() throws Exception + { + doTestPassiveTTL(true); + } + + public void testPassiveTTL() throws Exception + { + doTestPassiveTTL(false); + + } + + private void doTestPassiveTTL(boolean prefetchMessages) throws JMSException, NamingException + { + //Create Client 1 + Connection clientConnection = getConnection(); + + Session clientSession = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = clientSession.createQueue(QUEUE); + + // Create then close the consumer so the queue is actually created + // Closing it then reopening it ensures that the consumer shouldn't get messages + // which should have expired and allows a shorter sleep period. See QPID-1418 + + MessageConsumer consumer = clientSession.createConsumer(queue); + consumer.close(); + + //Create Producer + Connection producerConnection = getConnection(); + + producerConnection.start(); + + // Move to a Transacted session to ensure that all messages have been delivered to broker before + // we start waiting for TTL + Session producerSession = producerConnection.createSession(true, Session.SESSION_TRANSACTED); + + MessageProducer producer = producerSession.createProducer(queue); + + consumer = clientSession.createConsumer(queue); + if(prefetchMessages) + { + clientConnection.start(); + } + + //Set TTL + int msg = 0; + producer.send(nextMessage(String.valueOf(msg), true, producerSession, producer)); + + producer.setTimeToLive(TIME_TO_LIVE); + + for (; msg < MSG_COUNT - 2; msg++) + { + producer.send(nextMessage(String.valueOf(msg), false, producerSession, producer)); + } + + //Reset TTL + producer.setTimeToLive(0L); + producer.send(nextMessage(String.valueOf(msg), false, producerSession, producer)); + + producerSession.commit(); + + + // Ensure we sleep the required amount of time. + ReentrantLock waitLock = new ReentrantLock(); + Condition wait = waitLock.newCondition(); + final long MILLIS = 1000000L; + + long waitTime = TIME_TO_LIVE * MILLIS; + while (waitTime > 0) + { + try + { + waitLock.lock(); + + waitTime = wait.awaitNanos(waitTime); + } + catch (InterruptedException e) + { + //Stop if we are interrupted + fail(e.getMessage()); + } + finally + { + waitLock.unlock(); + } + + } + + if(prefetchMessages) + { + clientConnection.close(); + clientConnection = getConnection(); + + clientSession = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + queue = clientSession.createQueue(QUEUE); + consumer = clientSession.createConsumer(queue); + } + + clientConnection.start(); + + //Receive Message 0 + // Set 5s receive time for messages we expect to receive. + Message receivedFirst = consumer.receive(5000); + Message receivedSecond = consumer.receive(5000); + Message receivedThird = consumer.receive(1000); + + // Log the messages to help diagnosis incase of failure + _logger.info("First:"+receivedFirst); + _logger.info("Second:"+receivedSecond); + _logger.info("Third:"+receivedThird); + + // Only first and last messages sent should survive expiry + Assert.assertNull("More messages received", receivedThird); + + Assert.assertNotNull("First message not received", receivedFirst); + Assert.assertTrue("First message doesn't have first set.", receivedFirst.getBooleanProperty("first")); + Assert.assertEquals("First message has incorrect TTL.", 0L, receivedFirst.getLongProperty("TTL")); + + Assert.assertNotNull("Final message not received", receivedSecond); + Assert.assertFalse("Final message has first set.", receivedSecond.getBooleanProperty("first")); + Assert.assertEquals("Final message has incorrect TTL.", 0L, receivedSecond.getLongProperty("TTL")); + + clientConnection.close(); + + producerConnection.close(); + } + + private Message nextMessage(String msg, boolean first, Session producerSession, MessageProducer producer) throws JMSException + { + Message send = producerSession.createTextMessage("Message " + msg); + send.setBooleanProperty("first", first); + send.setStringProperty("testprop", "TimeToLiveTest"); + send.setLongProperty("TTL", producer.getTimeToLive()); + return send; + } + + + /** + * Tests the expired messages get actively deleted even on queues which have no consumers + * @throws Exception + */ + public void testActiveTTL() throws Exception + { + Connection producerConnection = getConnection(); + AMQSession producerSession = (AMQSession) producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = producerSession.createTemporaryQueue(); + producerSession.declareAndBind((AMQDestination) queue); + MessageProducer producer = producerSession.createProducer(queue); + producer.setTimeToLive(1000L); + + // send Messages + for(int i = 0; i < MSG_COUNT; i++) + { + producer.send(producerSession.createTextMessage("Message: "+i)); + } + long failureTime = System.currentTimeMillis() + 2 * SERVER_TTL_TIMEOUT; + + // check Queue depth for up to TIMEOUT seconds after the Queue Depth hasn't changed for 100ms. + long messageCount = MSG_COUNT; + long lastPass; + + do + { + lastPass = messageCount; + Thread.sleep(100); + messageCount = producerSession.getQueueDepth((AMQDestination) queue); + + // If we have received messages in the last loop then extend the timeout time. + // if we get messages stuck that are not expiring then the failureTime will occur + // failing the test. This will help with the scenario when the broker does not + // have enough CPU cycles to process the TTLs. + if (lastPass != messageCount) + { + failureTime = System.currentTimeMillis() + 2 * SERVER_TTL_TIMEOUT; + } + } + while(messageCount > 0L && System.currentTimeMillis() < failureTime); + + assertEquals("Messages not automatically expired: ", 0L, messageCount); + + producer.close(); + producerSession.close(); + producerConnection.close(); + } + + public void testPassiveTTLwithDurableSubscription() throws Exception + { + //Create Client 1 + Connection clientConnection = getConnection(); + + Session clientSession = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Create and close the durable subscriber + AMQTopic topic = new AMQTopic((AMQConnection) clientConnection, getTestQueueName()); + TopicSubscriber durableSubscriber = clientSession.createDurableSubscriber(topic, getTestQueueName(),"testprop='TimeToLiveTest'", false); + durableSubscriber.close(); + + //Create Producer + Connection producerConnection = getConnection(); + + producerConnection.start(); + + // Move to a Transacted session to ensure that all messages have been delivered to broker before + // we start waiting for TTL + Session producerSession = producerConnection.createSession(true, Session.SESSION_TRANSACTED); + + MessageProducer producer = producerSession.createProducer(topic); + + //Set TTL + int msg = 0; + producer.send(nextMessage(String.valueOf(msg), true, producerSession, producer)); + + producer.setTimeToLive(TIME_TO_LIVE); + + for (; msg < MSG_COUNT - 2; msg++) + { + producer.send(nextMessage(String.valueOf(msg), false, producerSession, producer)); + } + + //Reset TTL + producer.setTimeToLive(0L); + producer.send(nextMessage(String.valueOf(msg), false, producerSession, producer)); + + producerSession.commit(); + + //resubscribe + durableSubscriber = clientSession.createDurableSubscriber(topic, getTestQueueName(),"testprop='TimeToLiveTest'", false); + + // Ensure we sleep the required amount of time. + ReentrantLock waitLock = new ReentrantLock(); + Condition wait = waitLock.newCondition(); + final long MILLIS = 1000000L; + + long waitTime = TIME_TO_LIVE * MILLIS; + while (waitTime > 0) + { + try + { + waitLock.lock(); + + waitTime = wait.awaitNanos(waitTime); + } + catch (InterruptedException e) + { + //Stop if we are interrupted + fail(e.getMessage()); + } + finally + { + waitLock.unlock(); + } + + } + + clientConnection.start(); + + //Receive Message 0 + // Set 5s receive time for messages we expect to receive. + Message receivedFirst = durableSubscriber.receive(5000); + Message receivedSecond = durableSubscriber.receive(5000); + Message receivedThird = durableSubscriber.receive(1000); + + // Log the messages to help diagnosis incase of failure + _logger.info("First:"+receivedFirst); + _logger.info("Second:"+receivedSecond); + _logger.info("Third:"+receivedThird); + + // Only first and last messages sent should survive expiry + Assert.assertNull("More messages received", receivedThird); + + Assert.assertNotNull("First message not received", receivedFirst); + Assert.assertTrue("First message doesn't have first set.", receivedFirst.getBooleanProperty("first")); + Assert.assertEquals("First message has incorrect TTL.", 0L, receivedFirst.getLongProperty("TTL")); + + Assert.assertNotNull("Final message not received", receivedSecond); + Assert.assertFalse("Final message has first set.", receivedSecond.getBooleanProperty("first")); + Assert.assertEquals("Final message has incorrect TTL.", 0L, receivedSecond.getLongProperty("TTL")); + + clientSession.unsubscribe(getTestQueueName()); + clientConnection.close(); + + producerConnection.close(); + } + + public void testActiveTTLwithDurableSubscription() throws Exception + { + //Create Client 1 + Connection clientConnection = getConnection(); + Session clientSession = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Create and close the durable subscriber + AMQTopic topic = new AMQTopic((AMQConnection) clientConnection, getTestQueueName()); + TopicSubscriber durableSubscriber = clientSession.createDurableSubscriber(topic, "MyDurableTTLSubscription","testprop='TimeToLiveTest'", false); + durableSubscriber.close(); + + //Create Producer + Connection producerConnection = getConnection(); + AMQSession producerSession = (AMQSession) producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = producerSession.createProducer(topic); + producer.setTimeToLive(1000L); + + // send Messages + for(int i = 0; i < MSG_COUNT; i++) + { + producer.send(producerSession.createTextMessage("Message: "+i)); + } + long failureTime = System.currentTimeMillis() + 2 * SERVER_TTL_TIMEOUT; + + // check Queue depth for up to TIMEOUT seconds after the Queue Depth hasn't changed for 100ms. + long messageCount = MSG_COUNT; + long lastPass; + AMQQueue subcriptionQueue = new AMQQueue("amq.topic","clientid" + ":" + "MyDurableTTLSubscription"); + do + { + lastPass = messageCount; + Thread.sleep(100); + messageCount = producerSession.getQueueDepth((AMQDestination) subcriptionQueue); + + // If we have received messages in the last loop then extend the timeout time. + // if we get messages stuck that are not expiring then the failureTime will occur + // failing the test. This will help with the scenario when the broker does not + // have enough CPU cycles to process the TTLs. + if (lastPass != messageCount) + { + failureTime = System.currentTimeMillis() + 2 * SERVER_TTL_TIMEOUT; + } + } + while(messageCount > 0L && System.currentTimeMillis() < failureTime); + + assertEquals("Messages not automatically expired: ", 0L, messageCount); + + producer.close(); + producerSession.close(); + producerConnection.close(); + + clientSession.unsubscribe("MyDurableTTLSubscription"); + clientSession.close(); + clientConnection.close(); + } + +} |