summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2012-01-13 14:39:19 +0000
committerKeith Wall <kwall@apache.org>2012-01-13 14:39:19 +0000
commitda629e1b983ca730105c1c1443061242da0b9f16 (patch)
treea60e98857e66f87080fcc7a46a0f4eb4af686afc
parent406ff602a70cc507d93cb74b578bf867e503dfc4 (diff)
downloadqpid-python-da629e1b983ca730105c1c1443061242da0b9f16.tar.gz
QPID-3747: Prefetch behaviour test occasionally fails on 0-10 profile
Reorder the sequence of test to avoid the possibility that the 3rd message will go to the 'wrong' consumer. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1231094 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java50
1 files changed, 28 insertions, 22 deletions
diff --git a/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java b/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java
index 5ef56045bb..d91b9b9263 100644
--- a/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java
@@ -138,7 +138,7 @@ public class PrefetchBehaviourTest extends QpidBrokerTestCase
* This test was originally known as AMQConnectionTest#testPrefetchSystemProperty.
*
*/
- public void testLowPrefetchCausesMessagesToBeDistributedBetweenConsumers() throws Exception
+ public void testMessagesAreDistributedBetweenConsumersWithLowPrefetch() throws Exception
{
Queue queue = getTestQueue();
@@ -146,45 +146,51 @@ public class PrefetchBehaviourTest extends QpidBrokerTestCase
Connection connection = getConnection();
connection.start();
- // Create two consumers on different sessions
- Session consSessA = connection.createSession(true, Session.SESSION_TRANSACTED);
+ // Create Consumer A
+ Session consSessA = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumerA = consSessA.createConsumer(queue);
- _logger.debug("Consumer A " + consumerA);
+
+ // ensure message delivery to consumer A is started (required for 0-8..0-9-1)
+ final Message msg = consumerA.receiveNoWait();
+ assertNull(msg);
Session producerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
sendMessage(producerSession, queue, 3);
+ // Create Consumer B
MessageConsumer consumerB = null;
-
if (isBroker010())
{
+ // 0-10 prefetch is per consumer so we create Consumer B on the same session as Consumer A
consumerB = consSessA.createConsumer(queue);
}
else
{
- // 0-8, 0-9, 0-9-1 prefetch is per session, not consumer.
- Session consSessB = connection.createSession(true, Session.SESSION_TRANSACTED);
+ // 0-8..0-9-1 prefetch is per session so we create Consumer B on a separate session
+ Session consSessB = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
consumerB = consSessB.createConsumer(queue);
}
- _logger.debug("Consumer B " + consumerB);
- Message msg;
- // Check that consumer A has 2 messages
+ // As message delivery to consumer A is already started, the first two messages should
+ // now be with consumer A. The last message will still be on the Broker as consumer A's
+ // credit is exhausted and message delivery for consumer B is not yet running.
+
+ // As described by QPID-3747, for 0-10 we *must* check Consumer B before Consumer A.
+ // If we were to reverse the order, the SessionComplete will restore Consumer A's credit,
+ // and the third message could be delivered to either Consumer A or Consumer B.
+
+ // Check that consumer B gets the last (third) message.
+ final Message msgConsumerB = consumerB.receive(1500);
+ assertNotNull("Consumer B should have received a message", msgConsumerB);
+ assertEquals("Consumer B received message with unexpected index", 2, msgConsumerB.getIntProperty(INDEX));
+
+ // Now check that consumer A has indeed got the first two messages.
for (int i = 0; i < 2; i++)
{
- msg = consumerA.receive(1500);
- assertNotNull("Consumer A should receive 2 messages", msg);
+ final Message msgConsumerA = consumerA.receive(1500);
+ assertNotNull("Consumer A should have received a message " + i, msgConsumerA);
+ assertEquals("Consumer A received message with unexpected index", i, msgConsumerA.getIntProperty(INDEX));
}
-
- _logger.debug("Checking that Consumer A does not have 3rd message");
- msg = consumerA.receive(1500);
- assertNull("Consumer A should not have received a 3rd message",msg);
-
- // Check that consumer B has the last message
- _logger.debug("Checking that Consumer B does have 3rd message");
- msg = consumerB.receive(1500);
- assertNotNull("Consumer B should have received the message",msg);
}
-
}