summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2011-10-30 18:45:08 +0000
committerRobert Gemmell <robbie@apache.org>2011-10-30 18:45:08 +0000
commita9b01b09befb764d0deedd548ace173728017331 (patch)
treea05078678bebd45316f0336b52aa93173c8a46de
parent64accfb0c0b0ab176e6f770abad2c77eb0d8b0ff (diff)
downloadqpid-python-a9b01b09befb764d0deedd548ace173728017331.tar.gz
QPID-3562: actually commit the test too
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1195215 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java116
1 files changed, 116 insertions, 0 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java
new file mode 100644
index 0000000000..c0b07f239b
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java
@@ -0,0 +1,116 @@
+package org.apache.qpid.client.prefetch;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.qpid.configuration.ClientProperties;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PrefetchBehaviourTest extends QpidBrokerTestCase
+{
+ protected static final Logger _logger = LoggerFactory.getLogger(PrefetchBehaviourTest.class);
+ private Connection _normalConnection;
+ private AtomicBoolean _exceptionCaught;
+ private CountDownLatch _processingStarted;
+ private CountDownLatch _processingCompleted;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ _normalConnection = getConnection();
+ _exceptionCaught = new AtomicBoolean();
+ _processingStarted = new CountDownLatch(1);
+ _processingCompleted = new CountDownLatch(1);
+ }
+
+ /**
+ * Verifies that a slow processing asynchronous transacted consumer with prefetch=1 only
+ * gets 1 of the messages sent, with the second consumer picking up the others while the
+ * slow consumer is processing, i.e that prefetch=1 actually does what it says on the tin.
+ */
+ public void testPrefetchOneWithAsynchronousTransactedConsumer() throws Exception
+ {
+ final long processingTime = 5000;
+
+ //create a second connection with prefetch set to 1
+ setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, new Integer(1).toString());
+ Connection prefetch1Connection = getConnection();
+
+ prefetch1Connection.start();
+ _normalConnection.start();
+
+ //create an asynchronous consumer with simulated slow processing
+ final Session prefetch1session = prefetch1Connection.createSession(true, Session.SESSION_TRANSACTED);
+ Queue queue = prefetch1session.createQueue(getTestQueueName());
+ MessageConsumer prefetch1consumer = prefetch1session.createConsumer(queue);
+ prefetch1consumer.setMessageListener(new MessageListener()
+ {
+ public void onMessage(Message message)
+ {
+ try
+ {
+ _logger.debug("starting processing");
+ _processingStarted.countDown();
+ _logger.debug("processing started");
+
+ //simulate message processing
+ Thread.sleep(processingTime);
+
+ prefetch1session.commit();
+
+ _processingCompleted.countDown();
+ }
+ catch(Exception e)
+ {
+ _logger.error("Exception caught in message listener");
+ _exceptionCaught.set(true);
+ }
+ }
+ });
+
+ //create producer and send 5 messages
+ Session producerSession = _normalConnection.createSession(true, Session.SESSION_TRANSACTED);
+ MessageProducer producer = producerSession.createProducer(queue);
+
+ for (int i = 0; i < 5; i++)
+ {
+ producer.send(producerSession.createTextMessage("test"));
+ }
+ producerSession.commit();
+
+ //wait for the first message to start being processed by the async consumer
+ assertTrue("Async processing failed to start in allowed timeframe", _processingStarted.await(2000, TimeUnit.MILLISECONDS));
+ _logger.debug("proceeding with test");
+
+ //try to consumer the other messages with another consumer while the async procesisng occurs
+ Session normalSession = _normalConnection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer normalConsumer = normalSession.createConsumer(queue);
+
+ Message msg;
+ // Check that other consumer gets the other 4 messages
+ for (int i = 0; i < 4; i++)
+ {
+ msg = normalConsumer.receive(1500);
+ assertNotNull("Consumer should receive 4 messages",msg);
+ }
+ msg = normalConsumer.receive(250);
+ assertNull("Consumer should not have received a 5th message",msg);
+
+ //wait for the other consumer to finish to ensure it completes ok
+ _logger.debug("waiting for async consumer to complete");
+ assertTrue("Async processing failed to complete in allowed timeframe", _processingStarted.await(processingTime + 2000, TimeUnit.MILLISECONDS));
+ assertFalse("Unexpecte exception during async message processing",_exceptionCaught.get());
+ }
+
+}