summaryrefslogtreecommitdiff
path: root/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java')
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java59
1 files changed, 35 insertions, 24 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
index c5cdb83bbf..2a44413ac8 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
@@ -51,7 +51,13 @@ import javax.jms.TopicSubscriber;
public class DurableSubscriptionTest extends QpidTestCase
{
private static final Logger _logger = LoggerFactory.getLogger(DurableSubscriptionTest.class);
-
+
+ /** Timeout for receive() if we are expecting a message */
+ private static final long POSITIVE_RECEIVE_TIMEOUT = 2000;
+
+ /** Timeout for receive() if we are not expecting a message */
+ private static final long NEGATIVE_RECEIVE_TIMEOUT = 1000;
+
public void testUnsubscribe() throws Exception
{
AMQConnection con = (AMQConnection) getConnection("guest", "guest");
@@ -76,16 +82,18 @@ public class DurableSubscriptionTest extends QpidTestCase
Message msg;
_logger.info("Receive message on consumer 1:expecting A");
- msg = consumer1.receive();
+ msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT);
+ assertNotNull("Message should have been received",msg);
assertEquals("A", ((TextMessage) msg).getText());
_logger.info("Receive message on consumer 1 :expecting null");
- msg = consumer1.receive(1000);
+ msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT);
assertEquals(null, msg);
- _logger.info("Receive message on consumer 1:expecting A");
- msg = consumer2.receive();
+ _logger.info("Receive message on consumer 2:expecting A");
+ msg = consumer2.receive(POSITIVE_RECEIVE_TIMEOUT);
+ assertNotNull("Message should have been received",msg);
assertEquals("A", ((TextMessage) msg).getText());
- msg = consumer2.receive(1000);
+ msg = consumer2.receive(NEGATIVE_RECEIVE_TIMEOUT);
_logger.info("Receive message on consumer 1 :expecting null");
assertEquals(null, msg);
@@ -96,14 +104,15 @@ public class DurableSubscriptionTest extends QpidTestCase
producer.send(session1.createTextMessage("B"));
_logger.info("Receive message on consumer 1 :expecting B");
- msg = consumer1.receive();
+ msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT);
+ assertNotNull("Message should have been received",msg);
assertEquals("B", ((TextMessage) msg).getText());
_logger.info("Receive message on consumer 1 :expecting null");
- msg = consumer1.receive(1000);
+ msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT);
assertEquals(null, msg);
_logger.info("Receive message on consumer 2 :expecting null");
- msg = consumer2.receive(1000);
+ msg = consumer2.receive(NEGATIVE_RECEIVE_TIMEOUT);
assertEquals(null, msg);
_logger.info("Close connection");
@@ -143,14 +152,16 @@ public class DurableSubscriptionTest extends QpidTestCase
producer.send(session1.createTextMessage("A"));
Message msg;
- msg = consumer1.receive();
+ msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT);
+ assertNotNull("Message should have been received",msg);
assertEquals("A", ((TextMessage) msg).getText());
- msg = consumer1.receive(1000);
+ msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT);
assertEquals(null, msg);
- msg = consumer2.receive();
+ msg = consumer2.receive(POSITIVE_RECEIVE_TIMEOUT);
+ assertNotNull("Message should have been received",msg);
assertEquals("A", ((TextMessage) msg).getText());
- msg = consumer2.receive(1000);
+ msg = consumer2.receive(NEGATIVE_RECEIVE_TIMEOUT);
assertEquals(null, msg);
consumer2.close();
@@ -220,8 +231,8 @@ public class DurableSubscriptionTest extends QpidTestCase
msg = consumer1.receive(500);
assertNull("There should be no more messages for consumption on consumer1.", msg);
- msg = consumer2.receive();
- assertNotNull(msg);
+ msg = consumer2.receive(POSITIVE_RECEIVE_TIMEOUT);
+ assertNotNull("Message should have been received",msg);
assertEquals("Consumer 2 should also received the first msg.", "A", ((TextMessage) msg).getText());
msg = consumer2.receive(500);
assertNull("There should be no more messages for consumption on consumer2.", msg);
@@ -235,10 +246,10 @@ public class DurableSubscriptionTest extends QpidTestCase
producer.send(session0.createTextMessage("B"));
_logger.info("Receive message on consumer 1 :expecting B");
- msg = consumer1.receive(1000);
+ msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT);
assertEquals("B", ((TextMessage) msg).getText());
_logger.info("Receive message on consumer 1 :expecting null");
- msg = consumer1.receive(1000);
+ msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT);
assertEquals(null, msg);
// Re-attach a new consumer to the durable subscription, and check that it gets the message that it missed.
@@ -296,7 +307,7 @@ public class DurableSubscriptionTest extends QpidTestCase
producer.send(session.createTextMessage("testDurableWithInvalidSelector2"));
- Message msg = liveSubscriber.receive();
+ Message msg = liveSubscriber.receive(POSITIVE_RECEIVE_TIMEOUT);
assertNotNull ("Message should have been received", msg);
assertEquals ("testDurableWithInvalidSelector2", ((TextMessage) msg).getText());
assertNull("Should not receive subsequent message", liveSubscriber.receive(200));
@@ -331,7 +342,7 @@ public class DurableSubscriptionTest extends QpidTestCase
assertNotNull("Subscriber should have been created", liveSubscriber);
producer.send(session.createTextMessage("testDurableWithInvalidSelector2"));
- Message msg = liveSubscriber.receive();
+ Message msg = liveSubscriber.receive(POSITIVE_RECEIVE_TIMEOUT);
assertNotNull ("Message should have been received", msg);
assertEquals ("testDurableWithInvalidSelector2", ((TextMessage) msg).getText());
assertNull("Should not receive subsequent message", liveSubscriber.receive(200));
@@ -360,13 +371,13 @@ public class DurableSubscriptionTest extends QpidTestCase
// Send 1 matching message and 1 non-matching message
sendMatchingAndNonMatchingMessage(session, producer);
- Message rMsg = subA.receive(1000);
+ Message rMsg = subA.receive(NEGATIVE_RECEIVE_TIMEOUT);
assertNotNull(rMsg);
assertEquals("Content was wrong",
"testResubscribeWithChangedSelector1",
((TextMessage) rMsg).getText());
- rMsg = subA.receive(1000);
+ rMsg = subA.receive(NEGATIVE_RECEIVE_TIMEOUT);
assertNull(rMsg);
// Disconnect subscriber
@@ -379,13 +390,13 @@ public class DurableSubscriptionTest extends QpidTestCase
// Check messages are recieved properly
sendMatchingAndNonMatchingMessage(session, producer);
- rMsg = subB.receive(1000);
+ rMsg = subB.receive(NEGATIVE_RECEIVE_TIMEOUT);
assertNotNull(rMsg);
assertEquals("Content was wrong",
"testResubscribeWithChangedSelector2",
((TextMessage) rMsg).getText());
- rMsg = subB.receive(1000);
+ rMsg = subB.receive(NEGATIVE_RECEIVE_TIMEOUT);
assertNull(rMsg);
session.unsubscribe("testResubscribeWithChangedSelector");
}
@@ -429,5 +440,5 @@ public class DurableSubscriptionTest extends QpidTestCase
public static junit.framework.Test suite()
{
return new junit.framework.TestSuite(DurableSubscriptionTest.class);
- }
+ }
}