summaryrefslogtreecommitdiff
path: root/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java')
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java64
1 files changed, 46 insertions, 18 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java
index fedb88d008..e606df3f7d 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java
@@ -21,15 +21,8 @@
package org.apache.qpid.server.queue;
-import org.junit.Assert;
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.AMQTopic;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
import javax.jms.Connection;
import javax.jms.JMSException;
@@ -39,8 +32,17 @@ import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TopicSubscriber;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
+import javax.naming.NamingException;
+
+import org.apache.log4j.Logger;
+import org.junit.Assert;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
public class TimeToLiveTest extends QpidBrokerTestCase
{
@@ -53,18 +55,29 @@ public class TimeToLiveTest extends QpidBrokerTestCase
private static final int MSG_COUNT = 50;
private static final long SERVER_TTL_TIMEOUT = 60000L;
+ public void testPassiveTTLWithPrefetch() throws Exception
+ {
+ doTestPassiveTTL(true);
+ }
+
public void testPassiveTTL() throws Exception
{
+ doTestPassiveTTL(false);
+
+ }
+
+ private void doTestPassiveTTL(boolean prefetchMessages) throws JMSException, NamingException
+ {
//Create Client 1
Connection clientConnection = getConnection();
-
+
Session clientSession = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Queue queue = clientSession.createQueue(QUEUE);
-
+ Queue queue = clientSession.createQueue(QUEUE);
+
// Create then close the consumer so the queue is actually created
// Closing it then reopening it ensures that the consumer shouldn't get messages
// which should have expired and allows a shorter sleep period. See QPID-1418
-
+
MessageConsumer consumer = clientSession.createConsumer(queue);
consumer.close();
@@ -79,6 +92,12 @@ public class TimeToLiveTest extends QpidBrokerTestCase
MessageProducer producer = producerSession.createProducer(queue);
+ consumer = clientSession.createConsumer(queue);
+ if(prefetchMessages)
+ {
+ clientConnection.start();
+ }
+
//Set TTL
int msg = 0;
producer.send(nextMessage(String.valueOf(msg), true, producerSession, producer));
@@ -96,7 +115,6 @@ public class TimeToLiveTest extends QpidBrokerTestCase
producerSession.commit();
- consumer = clientSession.createConsumer(queue);
// Ensure we sleep the required amount of time.
ReentrantLock waitLock = new ReentrantLock();
@@ -124,6 +142,16 @@ public class TimeToLiveTest extends QpidBrokerTestCase
}
+ if(prefetchMessages)
+ {
+ clientConnection.close();
+ clientConnection = getConnection();
+
+ clientSession = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ queue = clientSession.createQueue(QUEUE);
+ consumer = clientSession.createConsumer(queue);
+ }
+
clientConnection.start();
//Receive Message 0
@@ -131,14 +159,14 @@ public class TimeToLiveTest extends QpidBrokerTestCase
Message receivedFirst = consumer.receive(5000);
Message receivedSecond = consumer.receive(5000);
Message receivedThird = consumer.receive(1000);
-
+
// Log the messages to help diagnosis incase of failure
_logger.info("First:"+receivedFirst);
_logger.info("Second:"+receivedSecond);
_logger.info("Third:"+receivedThird);
// Only first and last messages sent should survive expiry
- Assert.assertNull("More messages received", receivedThird);
+ Assert.assertNull("More messages received", receivedThird);
Assert.assertNotNull("First message not received", receivedFirst);
Assert.assertTrue("First message doesn't have first set.", receivedFirst.getBooleanProperty("first"));