summaryrefslogtreecommitdiff
path: root/qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java')
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java55
1 files changed, 47 insertions, 8 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java
index d91b9b9263..69441d2be6 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java
@@ -20,22 +20,24 @@
*/
package org.apache.qpid.client.prefetch;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.configuration.ClientProperties;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
import javax.jms.Connection;
+import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
-
-import org.apache.qpid.configuration.ClientProperties;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import javax.jms.TextMessage;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
public class PrefetchBehaviourTest extends QpidBrokerTestCase
@@ -192,5 +194,42 @@ public class PrefetchBehaviourTest extends QpidBrokerTestCase
assertEquals("Consumer A received message with unexpected index", i, msgConsumerA.getIntProperty(INDEX));
}
}
+
+ /**
+ * Test Goal: Verify if connection stop releases all messages in it's prefetch buffer.
+ * Test Strategy: Send 10 messages to a queue. Create a consumer with maxprefetch of 5, but never consume them.
+ * Stop the connection. Create a new connection and a consumer with maxprefetch 10 on the same queue.
+ * Try to receive all 10 messages.
+ */
+ public void testConnectionStop() throws Exception
+ {
+ setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, "10");
+ Connection con = getConnection();
+ con.start();
+ Session ssn = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Destination queue = ssn.createQueue("ADDR:my-queue;{create: always}");
+
+ MessageProducer prod = ssn.createProducer(queue);
+ for (int i=0; i<10;i++)
+ {
+ prod.send(ssn.createTextMessage("Msg" + i));
+ }
+
+ MessageConsumer consumer = ssn.createConsumer(queue);
+ // This is to ensure we get the first client to prefetch.
+ Message msg = consumer.receive(1000);
+ assertNotNull("The first consumer should get one message",msg);
+ con.stop();
+
+ Connection con2 = getConnection();
+ con2.start();
+ Session ssn2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer2 = ssn2.createConsumer(queue);
+ for (int i=0; i<9;i++)
+ {
+ TextMessage m = (TextMessage)consumer2.receive(1000);
+ assertNotNull("The second consumer should get 9 messages, but received only " + i,m);
+ }
+ }
}