diff options
Diffstat (limited to 'qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java')
-rw-r--r-- | qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java | 55 |
1 files changed, 47 insertions, 8 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java index d91b9b9263..69441d2be6 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java @@ -20,22 +20,24 @@ */ package org.apache.qpid.client.prefetch; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.qpid.configuration.ClientProperties; +import org.apache.qpid.test.utils.QpidBrokerTestCase; import javax.jms.Connection; +import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; - -import org.apache.qpid.configuration.ClientProperties; -import org.apache.qpid.test.utils.QpidBrokerTestCase; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import javax.jms.TextMessage; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; public class PrefetchBehaviourTest extends QpidBrokerTestCase @@ -192,5 +194,42 @@ public class PrefetchBehaviourTest extends QpidBrokerTestCase assertEquals("Consumer A received message with unexpected index", i, msgConsumerA.getIntProperty(INDEX)); } } + + /** + * Test Goal: Verify if connection stop releases all messages in it's prefetch buffer. + * Test Strategy: Send 10 messages to a queue. Create a consumer with maxprefetch of 5, but never consume them. + * Stop the connection. Create a new connection and a consumer with maxprefetch 10 on the same queue. + * Try to receive all 10 messages. + */ + public void testConnectionStop() throws Exception + { + setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, "10"); + Connection con = getConnection(); + con.start(); + Session ssn = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination queue = ssn.createQueue("ADDR:my-queue;{create: always}"); + + MessageProducer prod = ssn.createProducer(queue); + for (int i=0; i<10;i++) + { + prod.send(ssn.createTextMessage("Msg" + i)); + } + + MessageConsumer consumer = ssn.createConsumer(queue); + // This is to ensure we get the first client to prefetch. + Message msg = consumer.receive(1000); + assertNotNull("The first consumer should get one message",msg); + con.stop(); + + Connection con2 = getConnection(); + con2.start(); + Session ssn2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer2 = ssn2.createConsumer(queue); + for (int i=0; i<9;i++) + { + TextMessage m = (TextMessage)consumer2.receive(1000); + assertNotNull("The second consumer should get 9 messages, but received only " + i,m); + } + } } |