diff options
Diffstat (limited to 'qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java')
-rw-r--r-- | qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java | 92 |
1 files changed, 90 insertions, 2 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java index cafd212dd3..119949b0d6 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java @@ -20,8 +20,13 @@ */ package org.apache.qpid.test.unit.topic; +import java.io.IOException; +import java.util.Set; + +import org.apache.qpid.management.common.JMXConnnectionFactory; import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.AMQTopic; @@ -39,6 +44,9 @@ import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import javax.jms.TopicSubscriber; +import javax.management.MBeanServerConnection; +import javax.management.ObjectName; +import javax.management.remote.JMXConnector; /** * @todo Code to check that a consumer gets only one particular method could be factored into a re-usable method (as @@ -58,6 +66,36 @@ public class DurableSubscriptionTest extends QpidTestCase /** Timeout for receive() if we are not expecting a message */ private static final long NEGATIVE_RECEIVE_TIMEOUT = 1000; + private JMXConnector _jmxc; + private MBeanServerConnection _mbsc; + private static final String USER = "admin"; + private static final String PASSWORD = "admin"; + private boolean _jmxConnected; + + public void setUp() throws Exception + { + setConfigurationProperty("management.enabled", "true"); + _jmxConnected=false; + super.setUp(); + } + + public void tearDown() throws Exception + { + if(_jmxConnected) + { + try + { + _jmxc.close(); + } + catch (IOException e) + { + e.printStackTrace(); + } + } + + super.tearDown(); + } + public void testUnsubscribe() throws Exception { AMQConnection con = (AMQConnection) getConnection("guest", "guest"); @@ -79,6 +117,12 @@ public class DurableSubscriptionTest extends QpidTestCase _logger.info("Producer sending message A"); producer.send(session1.createTextMessage("A")); + + ((AMQSession)session1).sync(); + + //check the dur sub's underlying queue now has msg count 1 + AMQQueue subQueue = new AMQQueue("amq.topic", "clientid" + ":" + "MySubscription"); + assertEquals("Msg count should be 1", 1, ((AMQSession)session1).getQueueDepth(subQueue)); Message msg; _logger.info("Receive message on consumer 1:expecting A"); @@ -96,11 +140,46 @@ public class DurableSubscriptionTest extends QpidTestCase msg = consumer2.receive(NEGATIVE_RECEIVE_TIMEOUT); _logger.info("Receive message on consumer 1 :expecting null"); assertEquals(null, msg); + + ((AMQSession)session2).sync(); + + //check the dur sub's underlying queue now has msg count 0 + assertEquals("Msg count should be 0", 0, ((AMQSession)session2).getQueueDepth(subQueue)); consumer2.close(); _logger.info("Unsubscribe session2/consumer2"); session2.unsubscribe("MySubscription"); - + + ((AMQSession)session2).sync(); + + if(isJavaBroker() && isExternalBroker()) + { + //Verify that the queue was deleted by querying for its JMX MBean + _jmxc = JMXConnnectionFactory.getJMXConnection(5000, "127.0.0.1", + getManagementPort(getPort()), USER, PASSWORD); + + _jmxConnected = true; + _mbsc = _jmxc.getMBeanServerConnection(); + + //must replace the occurrence of ':' in queue name with '-' + String queueObjectNameText = "clientid" + "-" + "MySubscription"; + + ObjectName objName = new ObjectName("org.apache.qpid:type=VirtualHost.Queue,name=" + + queueObjectNameText + ",*"); + + Set<ObjectName> objectInstances = _mbsc.queryNames(objName, null); + + if(objectInstances.size() != 0) + { + fail("Queue MBean was found. Expected queue to have been deleted"); + } + else + { + _logger.info("Underlying dueue for the durable subscription was confirmed deleted."); + } + } + + //verify unsubscribing the durable subscriber did not affect the non-durable one _logger.info("Producer sending message B"); producer.send(session1.createTextMessage("B")); @@ -459,6 +538,9 @@ public class DurableSubscriptionTest extends QpidTestCase rMsg = subA.receive(NEGATIVE_RECEIVE_TIMEOUT); assertNull(rMsg); + // Send another 1 matching message and 1 non-matching message + sendMatchingAndNonMatchingMessage(session, producer); + // Disconnect subscriber subA.close(); @@ -466,9 +548,15 @@ public class DurableSubscriptionTest extends QpidTestCase TopicSubscriber subB = session.createDurableSubscriber(topic, "testResubscribeWithChangedSelector","Match = False", false); + //verify no messages are now present as changing selector should have issued + //an unsubscribe and thus deleted the previous backing queue for the subscription. + rMsg = subB.receive(NEGATIVE_RECEIVE_TIMEOUT); + assertNull("Should not have received message as the queue underlying the " + + "subscription should have been cleared/deleted when the selector was changed", rMsg); - // Check messages are recieved properly + // Check that new messages are received properly sendMatchingAndNonMatchingMessage(session, producer); + rMsg = subB.receive(NEGATIVE_RECEIVE_TIMEOUT); assertNotNull(rMsg); assertEquals("Content was wrong", |