diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2012-03-10 19:22:10 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2012-03-10 19:22:10 +0000 |
commit | 4eaa4e42093e5524d9552d8fa312c214524b6bb4 (patch) | |
tree | a251d57ee92d9c779fe4455c583be0ed90e69a43 /qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java | |
parent | 92be7e8f3163c048a8642d2deeaa921bbb65dc9c (diff) | |
download | qpid-python-rg-amqp-1-0-sandbox.tar.gz |
NO-JIRA : AMQP-1-0 sandbox updates - merge from trunkrg-amqp-1-0-sandbox
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rg-amqp-1-0-sandbox@1299257 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java')
-rw-r--r-- | qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java | 48 |
1 files changed, 22 insertions, 26 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java index e948aaffb3..a07e531b98 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java @@ -20,8 +20,15 @@ */ package org.apache.qpid.test.unit.topic; -import java.io.IOException; -import java.util.Set; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.management.common.JMXConnnectionFactory; +import org.apache.qpid.test.utils.QpidBrokerTestCase; import javax.jms.Connection; import javax.jms.InvalidDestinationException; @@ -37,15 +44,8 @@ import javax.jms.TopicSubscriber; import javax.management.MBeanServerConnection; import javax.management.ObjectName; import javax.management.remote.JMXConnector; - -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.client.AMQTopic; -import org.apache.qpid.management.common.JMXConnnectionFactory; -import org.apache.qpid.test.utils.QpidBrokerTestCase; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.Set; /** * @todo Code to check that a consumer gets only one particular method could be factored into a re-usable method (as @@ -116,12 +116,10 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase _logger.info("Producer sending message A"); producer.send(session1.createTextMessage("A")); - - ((AMQSession<?, ?>) session1).sync(); - + //check the dur sub's underlying queue now has msg count 1 AMQQueue subQueue = new AMQQueue("amq.topic", "clientid" + ":" + "MySubscription"); - assertEquals("Msg count should be 1", 1, ((AMQSession<?, ?>) session1).getQueueDepth(subQueue)); + assertEquals("Msg count should be 1", 1, ((AMQSession<?, ?>) session1).getQueueDepth(subQueue, true)); Message msg; _logger.info("Receive message on consumer 1:expecting A"); @@ -139,11 +137,9 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase msg = consumer2.receive(NEGATIVE_RECEIVE_TIMEOUT); _logger.info("Receive message on consumer 1 :expecting null"); assertEquals(null, msg); - - ((AMQSession<?, ?>) session2).sync(); - + //check the dur sub's underlying queue now has msg count 0 - assertEquals("Msg count should be 0", 0, ((AMQSession<?, ?>) session2).getQueueDepth(subQueue)); + assertEquals("Msg count should be 0", 0, ((AMQSession<?, ?>) session2).getQueueDepth(subQueue, true)); consumer2.close(); _logger.info("Unsubscribe session2/consumer2"); @@ -151,7 +147,7 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase ((AMQSession<?, ?>) session2).sync(); - if(isJavaBroker() && isExternalBroker()) + if(isJavaBroker()) { //Verify that the queue was deleted by querying for its JMX MBean _jmxc = JMXConnnectionFactory.getJMXConnection(5000, "127.0.0.1", @@ -635,7 +631,7 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase // should be 5 or 10 messages on queue now // (5 for the java broker due to use of server side selectors, and 10 for the cpp broker due to client side selectors only) AMQQueue queue = new AMQQueue("amq.topic", "clientid" + ":" + "sameMessageSelector"); - assertEquals("Queue depth is wrong", isJavaBroker() ? 5 : 10, ((AMQSession<?, ?>) session).getQueueDepth(queue)); + assertEquals("Queue depth is wrong", isJavaBroker() ? 5 : 10, ((AMQSession<?, ?>) session).getQueueDepth(queue, true)); // now recreate the durable subscriber and check the received messages TopicSubscriber subTwo = session.createDurableSubscriber(topic, "sameMessageSelector", "testprop = TRUE", false); @@ -721,11 +717,11 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart2"); msg.setBooleanProperty("Match", false); producer.send(msg); - ((AMQSession)session).sync(); + // should be 1 or 2 messages on queue now // (1 for the java broker due to use of server side selectors, and 2 for the cpp broker due to client side selectors only) AMQQueue queue = new AMQQueue("amq.topic", "clientid" + ":" + "testResubscribeWithChangedSelectorNoClose"); - assertEquals("Queue depth is wrong", isJavaBroker() ? 1 : 2, ((AMQSession<?, ?>) session).getQueueDepth(queue)); + assertEquals("Queue depth is wrong", isJavaBroker() ? 1 : 2, ((AMQSession<?, ?>) session).getQueueDepth(queue, true)); conn.start(); @@ -739,7 +735,7 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase assertNull(rMsg); // Check queue has no messages - assertEquals("Queue should be empty", 0, ((AMQSession<?, ?>) session).getQueueDepth(queue)); + assertEquals("Queue should be empty", 0, ((AMQSession<?, ?>) session).getQueueDepth(queue, true)); conn.close(); } @@ -793,7 +789,7 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase // should be 1 or 2 messages on queue now // (1 for the java broker due to use of server side selectors, and 2 for the cpp broker due to client side selectors only) AMQQueue queue = new AMQQueue("amq.topic", "clientid" + ":" + "subscriptionName"); - assertEquals("Queue depth is wrong", isJavaBroker() ? 1 : 2, ((AMQSession<?, ?>) session).getQueueDepth(queue)); + assertEquals("Queue depth is wrong", isJavaBroker() ? 1 : 2, ((AMQSession<?, ?>) session).getQueueDepth(queue, true)); conn.start(); @@ -807,7 +803,7 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase assertNull(rMsg); // Check queue has no messages - assertEquals("Queue should be empty", 0, ((AMQSession<?, ?>) session).getQueueDepth(queue)); + assertEquals("Queue should be empty", 0, ((AMQSession<?, ?>) session).getQueueDepth(queue, true)); conn.close(); } |