diff options
Diffstat (limited to 'qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java')
-rw-r--r-- | qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java | 64 |
1 files changed, 46 insertions, 18 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java index fedb88d008..e606df3f7d 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java @@ -21,15 +21,8 @@ package org.apache.qpid.server.queue; -import org.junit.Assert; -import org.apache.log4j.Logger; - -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQDestination; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.client.AMQTopic; -import org.apache.qpid.test.utils.QpidBrokerTestCase; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; import javax.jms.Connection; import javax.jms.JMSException; @@ -39,8 +32,17 @@ import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TopicSubscriber; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; +import javax.naming.NamingException; + +import org.apache.log4j.Logger; +import org.junit.Assert; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.test.utils.QpidBrokerTestCase; public class TimeToLiveTest extends QpidBrokerTestCase { @@ -53,18 +55,29 @@ public class TimeToLiveTest extends QpidBrokerTestCase private static final int MSG_COUNT = 50; private static final long SERVER_TTL_TIMEOUT = 60000L; + public void testPassiveTTLWithPrefetch() throws Exception + { + doTestPassiveTTL(true); + } + public void testPassiveTTL() throws Exception { + doTestPassiveTTL(false); + + } + + private void doTestPassiveTTL(boolean prefetchMessages) throws JMSException, NamingException + { //Create Client 1 Connection clientConnection = getConnection(); - + Session clientSession = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue queue = clientSession.createQueue(QUEUE); - + Queue queue = clientSession.createQueue(QUEUE); + // Create then close the consumer so the queue is actually created // Closing it then reopening it ensures that the consumer shouldn't get messages // which should have expired and allows a shorter sleep period. See QPID-1418 - + MessageConsumer consumer = clientSession.createConsumer(queue); consumer.close(); @@ -79,6 +92,12 @@ public class TimeToLiveTest extends QpidBrokerTestCase MessageProducer producer = producerSession.createProducer(queue); + consumer = clientSession.createConsumer(queue); + if(prefetchMessages) + { + clientConnection.start(); + } + //Set TTL int msg = 0; producer.send(nextMessage(String.valueOf(msg), true, producerSession, producer)); @@ -96,7 +115,6 @@ public class TimeToLiveTest extends QpidBrokerTestCase producerSession.commit(); - consumer = clientSession.createConsumer(queue); // Ensure we sleep the required amount of time. ReentrantLock waitLock = new ReentrantLock(); @@ -124,6 +142,16 @@ public class TimeToLiveTest extends QpidBrokerTestCase } + if(prefetchMessages) + { + clientConnection.close(); + clientConnection = getConnection(); + + clientSession = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + queue = clientSession.createQueue(QUEUE); + consumer = clientSession.createConsumer(queue); + } + clientConnection.start(); //Receive Message 0 @@ -131,14 +159,14 @@ public class TimeToLiveTest extends QpidBrokerTestCase Message receivedFirst = consumer.receive(5000); Message receivedSecond = consumer.receive(5000); Message receivedThird = consumer.receive(1000); - + // Log the messages to help diagnosis incase of failure _logger.info("First:"+receivedFirst); _logger.info("Second:"+receivedSecond); _logger.info("Third:"+receivedThird); // Only first and last messages sent should survive expiry - Assert.assertNull("More messages received", receivedThird); + Assert.assertNull("More messages received", receivedThird); Assert.assertNotNull("First message not received", receivedFirst); Assert.assertTrue("First message doesn't have first set.", receivedFirst.getBooleanProperty("first")); |