summaryrefslogtreecommitdiff
path: root/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java')
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java92
1 files changed, 90 insertions, 2 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
index cafd212dd3..119949b0d6 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
@@ -20,8 +20,13 @@
*/
package org.apache.qpid.test.unit.topic;
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.qpid.management.common.JMXConnnectionFactory;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQTopic;
@@ -39,6 +44,9 @@ import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
+import javax.management.MBeanServerConnection;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnector;
/**
* @todo Code to check that a consumer gets only one particular method could be factored into a re-usable method (as
@@ -58,6 +66,36 @@ public class DurableSubscriptionTest extends QpidTestCase
/** Timeout for receive() if we are not expecting a message */
private static final long NEGATIVE_RECEIVE_TIMEOUT = 1000;
+ private JMXConnector _jmxc;
+ private MBeanServerConnection _mbsc;
+ private static final String USER = "admin";
+ private static final String PASSWORD = "admin";
+ private boolean _jmxConnected;
+
+ public void setUp() throws Exception
+ {
+ setConfigurationProperty("management.enabled", "true");
+ _jmxConnected=false;
+ super.setUp();
+ }
+
+ public void tearDown() throws Exception
+ {
+ if(_jmxConnected)
+ {
+ try
+ {
+ _jmxc.close();
+ }
+ catch (IOException e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ super.tearDown();
+ }
+
public void testUnsubscribe() throws Exception
{
AMQConnection con = (AMQConnection) getConnection("guest", "guest");
@@ -79,6 +117,12 @@ public class DurableSubscriptionTest extends QpidTestCase
_logger.info("Producer sending message A");
producer.send(session1.createTextMessage("A"));
+
+ ((AMQSession)session1).sync();
+
+ //check the dur sub's underlying queue now has msg count 1
+ AMQQueue subQueue = new AMQQueue("amq.topic", "clientid" + ":" + "MySubscription");
+ assertEquals("Msg count should be 1", 1, ((AMQSession)session1).getQueueDepth(subQueue));
Message msg;
_logger.info("Receive message on consumer 1:expecting A");
@@ -96,11 +140,46 @@ public class DurableSubscriptionTest extends QpidTestCase
msg = consumer2.receive(NEGATIVE_RECEIVE_TIMEOUT);
_logger.info("Receive message on consumer 1 :expecting null");
assertEquals(null, msg);
+
+ ((AMQSession)session2).sync();
+
+ //check the dur sub's underlying queue now has msg count 0
+ assertEquals("Msg count should be 0", 0, ((AMQSession)session2).getQueueDepth(subQueue));
consumer2.close();
_logger.info("Unsubscribe session2/consumer2");
session2.unsubscribe("MySubscription");
-
+
+ ((AMQSession)session2).sync();
+
+ if(isJavaBroker() && isExternalBroker())
+ {
+ //Verify that the queue was deleted by querying for its JMX MBean
+ _jmxc = JMXConnnectionFactory.getJMXConnection(5000, "127.0.0.1",
+ getManagementPort(getPort()), USER, PASSWORD);
+
+ _jmxConnected = true;
+ _mbsc = _jmxc.getMBeanServerConnection();
+
+ //must replace the occurrence of ':' in queue name with '-'
+ String queueObjectNameText = "clientid" + "-" + "MySubscription";
+
+ ObjectName objName = new ObjectName("org.apache.qpid:type=VirtualHost.Queue,name="
+ + queueObjectNameText + ",*");
+
+ Set<ObjectName> objectInstances = _mbsc.queryNames(objName, null);
+
+ if(objectInstances.size() != 0)
+ {
+ fail("Queue MBean was found. Expected queue to have been deleted");
+ }
+ else
+ {
+ _logger.info("Underlying dueue for the durable subscription was confirmed deleted.");
+ }
+ }
+
+ //verify unsubscribing the durable subscriber did not affect the non-durable one
_logger.info("Producer sending message B");
producer.send(session1.createTextMessage("B"));
@@ -459,6 +538,9 @@ public class DurableSubscriptionTest extends QpidTestCase
rMsg = subA.receive(NEGATIVE_RECEIVE_TIMEOUT);
assertNull(rMsg);
+ // Send another 1 matching message and 1 non-matching message
+ sendMatchingAndNonMatchingMessage(session, producer);
+
// Disconnect subscriber
subA.close();
@@ -466,9 +548,15 @@ public class DurableSubscriptionTest extends QpidTestCase
TopicSubscriber subB = session.createDurableSubscriber(topic,
"testResubscribeWithChangedSelector","Match = False", false);
+ //verify no messages are now present as changing selector should have issued
+ //an unsubscribe and thus deleted the previous backing queue for the subscription.
+ rMsg = subB.receive(NEGATIVE_RECEIVE_TIMEOUT);
+ assertNull("Should not have received message as the queue underlying the " +
+ "subscription should have been cleared/deleted when the selector was changed", rMsg);
- // Check messages are recieved properly
+ // Check that new messages are received properly
sendMatchingAndNonMatchingMessage(session, producer);
+
rMsg = subB.receive(NEGATIVE_RECEIVE_TIMEOUT);
assertNotNull(rMsg);
assertEquals("Content was wrong",