summaryrefslogtreecommitdiff
path: root/java/systests/src/main/java/org/apache/qpid/test/client/FlowControlTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/systests/src/main/java/org/apache/qpid/test/client/FlowControlTest.java')
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/client/FlowControlTest.java46
1 files changed, 24 insertions, 22 deletions
diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/FlowControlTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/FlowControlTest.java
index 94096e412d..95808e454f 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/client/FlowControlTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/client/FlowControlTest.java
@@ -21,6 +21,7 @@
package org.apache.qpid.test.client;
import org.apache.qpid.client.AMQSession_0_8;
+import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.log4j.Logger;
@@ -79,7 +80,7 @@ public class FlowControlTest extends QpidTestCase
Connection consumerConnection = getConnection();
Session consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- ((AMQSession_0_8) consumerSession).setPrefecthLimits(0, 256);
+ ((AMQSession_0_8) consumerSession).setPrefetchLimits(0, 256);
MessageConsumer recv = consumerSession.createConsumer(_queue);
consumerConnection.start();
@@ -91,25 +92,22 @@ public class FlowControlTest extends QpidTestCase
assertNotNull("Second message not received", r2);
assertEquals("Messages in wrong order", 2, r2.getIntProperty("msg"));
- Message r3 = recv.receiveNoWait();
+ Message r3 = recv.receive(RECEIVE_TIMEOUT);
assertNull("Third message incorrectly delivered", r3);
- r1.acknowledge();
+ ((AbstractJMSMessage)r1).acknowledgeThis();
- r3 = recv.receiveNoWait();
+ r3 = recv.receive(RECEIVE_TIMEOUT);
assertNull("Third message incorrectly delivered", r3);
- r2.acknowledge();
+ ((AbstractJMSMessage)r2).acknowledgeThis();
r3 = recv.receive(RECEIVE_TIMEOUT);
assertNotNull("Third message not received", r3);
assertEquals("Messages in wrong order", 3, r3.getIntProperty("msg"));
- r3.acknowledge();
- recv.close();
- consumerSession.close();
+ ((AbstractJMSMessage)r3).acknowledgeThis();
consumerConnection.close();
-
}
public void testTwoConsumersBytesFlowControl() throws Exception
@@ -152,7 +150,7 @@ public class FlowControlTest extends QpidTestCase
Connection consumerConnection = getConnection();
Session consumerSession1 = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- ((AMQSession_0_8) consumerSession1).setPrefecthLimits(0, 256);
+ ((AMQSession_0_8) consumerSession1).setPrefetchLimits(0, 256);
MessageConsumer recv1 = consumerSession1.createConsumer(_queue);
consumerConnection.start();
@@ -161,21 +159,21 @@ public class FlowControlTest extends QpidTestCase
assertNotNull("First message not received", r1);
assertEquals("Messages in wrong order", 1, r1.getIntProperty("msg"));
- Message r2 = recv1.receiveNoWait();
+ Message r2 = recv1.receive(RECEIVE_TIMEOUT);
assertNull("Second message incorrectly delivered", r2);
Session consumerSession2 = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- ((AMQSession_0_8) consumerSession2).setPrefecthLimits(0, 256);
+ ((AMQSession_0_8) consumerSession2).setPrefetchLimits(0, 256);
MessageConsumer recv2 = consumerSession2.createConsumer(_queue);
- r2 = recv2.receive(100000L);//RECEIVE_TIMEOUT);
+ r2 = recv2.receive(RECEIVE_TIMEOUT);
assertNotNull("Second message not received", r2);
assertEquals("Messages in wrong order", 2, r2.getIntProperty("msg"));
- Message r3 = recv2.receiveNoWait();
+ Message r3 = recv2.receive(RECEIVE_TIMEOUT);
assertNull("Third message incorrectly delivered", r3);
- r3 = recv1.receive(100000L);//RECEIVE_TIMEOUT);
+ r3 = recv1.receive(RECEIVE_TIMEOUT);
assertNotNull("Third message not received", r3);
assertEquals("Messages in wrong order", 3, r3.getIntProperty("msg"));
@@ -198,13 +196,17 @@ public class FlowControlTest extends QpidTestCase
{
System.err.println("Test Run:" + ++run);
Thread.sleep(1000);
-
- test.startBroker();
- test.testBasicBytesFlowControl();
-
- Thread.sleep(1000);
-
- test.stopBroker();
+ try
+ {
+ test.startBroker();
+ test.testBasicBytesFlowControl();
+
+ Thread.sleep(1000);
+ }
+ finally
+ {
+ test.stopBroker();
+ }
}
}
}