summaryrefslogtreecommitdiff
path: root/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java')
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java48
1 files changed, 22 insertions, 26 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
index e948aaffb3..a07e531b98 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
@@ -20,8 +20,15 @@
*/
package org.apache.qpid.test.unit.topic;
-import java.io.IOException;
-import java.util.Set;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.management.common.JMXConnnectionFactory;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
import javax.jms.Connection;
import javax.jms.InvalidDestinationException;
@@ -37,15 +44,8 @@ import javax.jms.TopicSubscriber;
import javax.management.MBeanServerConnection;
import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
-
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.AMQTopic;
-import org.apache.qpid.management.common.JMXConnnectionFactory;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.Set;
/**
* @todo Code to check that a consumer gets only one particular method could be factored into a re-usable method (as
@@ -116,12 +116,10 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase
_logger.info("Producer sending message A");
producer.send(session1.createTextMessage("A"));
-
- ((AMQSession<?, ?>) session1).sync();
-
+
//check the dur sub's underlying queue now has msg count 1
AMQQueue subQueue = new AMQQueue("amq.topic", "clientid" + ":" + "MySubscription");
- assertEquals("Msg count should be 1", 1, ((AMQSession<?, ?>) session1).getQueueDepth(subQueue));
+ assertEquals("Msg count should be 1", 1, ((AMQSession<?, ?>) session1).getQueueDepth(subQueue, true));
Message msg;
_logger.info("Receive message on consumer 1:expecting A");
@@ -139,11 +137,9 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase
msg = consumer2.receive(NEGATIVE_RECEIVE_TIMEOUT);
_logger.info("Receive message on consumer 1 :expecting null");
assertEquals(null, msg);
-
- ((AMQSession<?, ?>) session2).sync();
-
+
//check the dur sub's underlying queue now has msg count 0
- assertEquals("Msg count should be 0", 0, ((AMQSession<?, ?>) session2).getQueueDepth(subQueue));
+ assertEquals("Msg count should be 0", 0, ((AMQSession<?, ?>) session2).getQueueDepth(subQueue, true));
consumer2.close();
_logger.info("Unsubscribe session2/consumer2");
@@ -151,7 +147,7 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase
((AMQSession<?, ?>) session2).sync();
- if(isJavaBroker() && isExternalBroker())
+ if(isJavaBroker())
{
//Verify that the queue was deleted by querying for its JMX MBean
_jmxc = JMXConnnectionFactory.getJMXConnection(5000, "127.0.0.1",
@@ -635,7 +631,7 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase
// should be 5 or 10 messages on queue now
// (5 for the java broker due to use of server side selectors, and 10 for the cpp broker due to client side selectors only)
AMQQueue queue = new AMQQueue("amq.topic", "clientid" + ":" + "sameMessageSelector");
- assertEquals("Queue depth is wrong", isJavaBroker() ? 5 : 10, ((AMQSession<?, ?>) session).getQueueDepth(queue));
+ assertEquals("Queue depth is wrong", isJavaBroker() ? 5 : 10, ((AMQSession<?, ?>) session).getQueueDepth(queue, true));
// now recreate the durable subscriber and check the received messages
TopicSubscriber subTwo = session.createDurableSubscriber(topic, "sameMessageSelector", "testprop = TRUE", false);
@@ -721,11 +717,11 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase
msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart2");
msg.setBooleanProperty("Match", false);
producer.send(msg);
- ((AMQSession)session).sync();
+
// should be 1 or 2 messages on queue now
// (1 for the java broker due to use of server side selectors, and 2 for the cpp broker due to client side selectors only)
AMQQueue queue = new AMQQueue("amq.topic", "clientid" + ":" + "testResubscribeWithChangedSelectorNoClose");
- assertEquals("Queue depth is wrong", isJavaBroker() ? 1 : 2, ((AMQSession<?, ?>) session).getQueueDepth(queue));
+ assertEquals("Queue depth is wrong", isJavaBroker() ? 1 : 2, ((AMQSession<?, ?>) session).getQueueDepth(queue, true));
conn.start();
@@ -739,7 +735,7 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase
assertNull(rMsg);
// Check queue has no messages
- assertEquals("Queue should be empty", 0, ((AMQSession<?, ?>) session).getQueueDepth(queue));
+ assertEquals("Queue should be empty", 0, ((AMQSession<?, ?>) session).getQueueDepth(queue, true));
conn.close();
}
@@ -793,7 +789,7 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase
// should be 1 or 2 messages on queue now
// (1 for the java broker due to use of server side selectors, and 2 for the cpp broker due to client side selectors only)
AMQQueue queue = new AMQQueue("amq.topic", "clientid" + ":" + "subscriptionName");
- assertEquals("Queue depth is wrong", isJavaBroker() ? 1 : 2, ((AMQSession<?, ?>) session).getQueueDepth(queue));
+ assertEquals("Queue depth is wrong", isJavaBroker() ? 1 : 2, ((AMQSession<?, ?>) session).getQueueDepth(queue, true));
conn.start();
@@ -807,7 +803,7 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase
assertNull(rMsg);
// Check queue has no messages
- assertEquals("Queue should be empty", 0, ((AMQSession<?, ?>) session).getQueueDepth(queue));
+ assertEquals("Queue should be empty", 0, ((AMQSession<?, ?>) session).getQueueDepth(queue, true));
conn.close();
}