diff options
Diffstat (limited to 'qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java')
-rw-r--r-- | qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java | 146 |
1 files changed, 113 insertions, 33 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java index a9ac028af6..4b61b6269c 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java @@ -24,6 +24,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQNoRouteException; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.AMQTopic; @@ -31,6 +32,7 @@ import org.apache.qpid.management.common.JMXConnnectionFactory; import org.apache.qpid.test.utils.QpidBrokerTestCase; import javax.jms.Connection; +import javax.jms.ExceptionListener; import javax.jms.InvalidDestinationException; import javax.jms.InvalidSelectorException; import javax.jms.JMSException; @@ -46,6 +48,8 @@ import javax.management.ObjectName; import javax.management.remote.JMXConnector; import java.io.IOException; import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; /** * @todo Code to check that a consumer gets only one particular method could be factored into a re-usable method (as @@ -58,7 +62,11 @@ import java.util.Set; public class DurableSubscriptionTest extends QpidBrokerTestCase { private static final Logger _logger = LoggerFactory.getLogger(DurableSubscriptionTest.class); - + + private static final String MY_TOPIC = "MyTopic"; + + private static final String MY_SUBSCRIPTION = "MySubscription"; + /** Timeout for receive() if we are expecting a message */ private static final long POSITIVE_RECEIVE_TIMEOUT = 2000; @@ -80,24 +88,29 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase public void tearDown() throws Exception { - if(_jmxConnected) + try { - try + if(_jmxConnected) { - _jmxc.close(); - } - catch (IOException e) - { - e.printStackTrace(); + try + { + _jmxc.close(); + } + catch (IOException e) + { + _logger.error("Error closing JMX connection", e); + } } } - - super.tearDown(); + finally + { + super.tearDown(); + } } - + public void testUnsubscribe() throws Exception { - AMQConnection con = (AMQConnection) getConnection("guest", "guest"); + AMQConnection con = (AMQConnection) getConnection(); AMQTopic topic = new AMQTopic(con, "MyDurableSubscriptionTestTopic"); _logger.info("Create Session 1"); Session session1 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE); @@ -109,7 +122,7 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase _logger.info("Create Session 2"); Session session2 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE); _logger.info("Create Durable Subscriber on Session 2"); - TopicSubscriber consumer2 = session2.createDurableSubscriber(topic, "MySubscription"); + TopicSubscriber consumer2 = session2.createDurableSubscriber(topic, MY_SUBSCRIPTION); _logger.info("Starting connection"); con.start(); @@ -118,7 +131,7 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase producer.send(session1.createTextMessage("A")); //check the dur sub's underlying queue now has msg count 1 - AMQQueue subQueue = new AMQQueue("amq.topic", "clientid" + ":" + "MySubscription"); + AMQQueue subQueue = new AMQQueue("amq.topic", "clientid" + ":" + MY_SUBSCRIPTION); assertEquals("Msg count should be 1", 1, ((AMQSession<?, ?>) session1).getQueueDepth(subQueue, true)); Message msg; @@ -143,7 +156,7 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase consumer2.close(); _logger.info("Unsubscribe session2/consumer2"); - session2.unsubscribe("MySubscription"); + session2.unsubscribe(MY_SUBSCRIPTION); ((AMQSession<?, ?>) session2).sync(); @@ -157,7 +170,7 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase _mbsc = _jmxc.getMBeanServerConnection(); //must replace the occurrence of ':' in queue name with '-' - String queueObjectNameText = "clientid" + "-" + "MySubscription"; + String queueObjectNameText = "clientid" + "-" + MY_SUBSCRIPTION; ObjectName objName = new ObjectName("org.apache.qpid:type=VirtualHost.Queue,name=" + queueObjectNameText + ",*"); @@ -189,7 +202,74 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase _logger.info("Close connection"); con.close(); } - + + + /** + * Specifically uses a subscriber with a selector because QPID-4731 found that selectors + * can prevent queue removal. + */ + public void testUnsubscribeWhenUsingSelectorMakesTopicUnreachable() throws Exception + { + setTestClientSystemProperty("qpid.default_mandatory_topic","true"); + + // set up subscription + AMQConnection connection = (AMQConnection) getConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = new AMQTopic(connection, MY_TOPIC); + MessageProducer producer = session.createProducer(topic); + + TopicSubscriber subscriber = session.createDurableSubscriber(topic, MY_SUBSCRIPTION, "1 = 1", false); + StoringExceptionListener exceptionListener = new StoringExceptionListener(); + connection.setExceptionListener(exceptionListener); + + // send message and verify it was consumed + producer.send(session.createTextMessage("message1")); + assertNotNull("Message should have been successfully received", subscriber.receive(POSITIVE_RECEIVE_TIMEOUT)); + assertEquals(null, exceptionListener.getException()); + session.unsubscribe(MY_SUBSCRIPTION); + + // send another message and verify that the connection exception listener was fired. + StoringExceptionListener exceptionListener2 = new StoringExceptionListener(); + connection.setExceptionListener(exceptionListener2); + + producer.send(session.createTextMessage("message that should be unroutable")); + ((AMQSession<?, ?>) session).sync(); + + JMSException exception = exceptionListener2.awaitException(); + assertNotNull("Expected exception as message should no longer be routable", exception); + + Throwable linkedException = exception.getLinkedException(); + assertNotNull("The linked exception of " + exception + " should be the 'no route' exception", linkedException); + assertEquals(AMQNoRouteException.class, linkedException.getClass()); + } + + private final class StoringExceptionListener implements ExceptionListener + { + private volatile JMSException _exception; + private CountDownLatch _latch = new CountDownLatch(1); + + @Override + public void onException(JMSException exception) + { + _exception = exception; + _logger.info("Exception listener received: " + exception); + _latch.countDown(); + } + + public JMSException awaitException() throws InterruptedException + { + _latch.await(POSITIVE_RECEIVE_TIMEOUT, TimeUnit.MILLISECONDS); + return _exception; + } + + public JMSException getException() + { + return _exception; + } + } + public void testDurabilityNOACK() throws Exception { durabilityImpl(AMQSession.NO_ACKNOWLEDGE, false); @@ -223,8 +303,8 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase private void durabilityImpl(int ackMode, boolean restartBroker) throws Exception { - AMQConnection con = (AMQConnection) getConnection("guest", "guest"); - AMQTopic topic = new AMQTopic(con, "MyTopic"); + AMQConnection con = (AMQConnection) getConnection(); + AMQTopic topic = new AMQTopic(con, MY_TOPIC); Session session1 = con.createSession(false, ackMode); MessageConsumer consumer1 = session1.createConsumer(topic); @@ -232,7 +312,7 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase MessageProducer producer = sessionProd.createProducer(topic); Session session2 = con.createSession(false, ackMode); - TopicSubscriber consumer2 = session2.createDurableSubscriber(topic, "MySubscription"); + TopicSubscriber consumer2 = session2.createDurableSubscriber(topic, MY_SUBSCRIPTION); con.start(); @@ -267,13 +347,13 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase consumer2.close(); session2.close(); - + //Send message C, then connect consumer 3 to durable subscription and get //message B if not using NO_ACK, then receive C with consumer 1 and 3 producer.send(session1.createTextMessage("C")); Session session3 = con.createSession(false, ackMode); - MessageConsumer consumer3 = session3.createDurableSubscriber(topic, "MySubscription"); + MessageConsumer consumer3 = session3.createDurableSubscriber(topic, MY_SUBSCRIPTION); if(ackMode == AMQSession.NO_ACKNOWLEDGE) { @@ -307,7 +387,7 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase consumer1.close(); consumer3.close(); - session3.unsubscribe("MySubscription"); + session3.unsubscribe(MY_SUBSCRIPTION); con.close(); @@ -328,28 +408,28 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase { Message msg; // Create producer. - AMQConnection con0 = (AMQConnection) getConnection("guest", "guest"); + AMQConnection con0 = (AMQConnection) getConnection(); con0.start(); Session session0 = con0.createSession(false, ackMode); - AMQTopic topic = new AMQTopic(con0, "MyTopic"); + AMQTopic topic = new AMQTopic(con0, MY_TOPIC); Session sessionProd = con0.createSession(false, ackMode); MessageProducer producer = sessionProd.createProducer(topic); // Create consumer 1. - AMQConnection con1 = (AMQConnection) getConnection("guest", "guest"); + AMQConnection con1 = (AMQConnection) getConnection(); con1.start(); Session session1 = con1.createSession(false, ackMode); MessageConsumer consumer1 = session1.createConsumer(topic); // Create consumer 2. - AMQConnection con2 = (AMQConnection) getConnection("guest", "guest"); + AMQConnection con2 = (AMQConnection) getConnection(); con2.start(); Session session2 = con2.createSession(false, ackMode); - TopicSubscriber consumer2 = session2.createDurableSubscriber(topic, "MySubscription"); + TopicSubscriber consumer2 = session2.createDurableSubscriber(topic, MY_SUBSCRIPTION); // Send message and check that both consumers get it and only it. producer.send(session0.createTextMessage("A")); @@ -393,11 +473,11 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase // Re-attach a new consumer to the durable subscription, and check that it gets message B it left (if not NO_ACK) // and also gets message C sent after it was disconnected. - AMQConnection con3 = (AMQConnection) getConnection("guest", "guest"); + AMQConnection con3 = (AMQConnection) getConnection(); con3.start(); Session session3 = con3.createSession(false, ackMode); - TopicSubscriber consumer3 = session3.createDurableSubscriber(topic, "MySubscription"); + TopicSubscriber consumer3 = session3.createDurableSubscriber(topic, MY_SUBSCRIPTION); if(ackMode == AMQSession.NO_ACKNOWLEDGE) { @@ -423,7 +503,7 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase consumer1.close(); consumer3.close(); - session3.unsubscribe("MySubscription"); + session3.unsubscribe(MY_SUBSCRIPTION); con0.close(); con1.close(); @@ -540,7 +620,7 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase TopicSubscriber subB = session.createDurableSubscriber(topic, "testResubscribeWithChangedSelector","Match = False", false); - //verify no messages are now recieved. + //verify no messages are now received. rMsg = subB.receive(NEGATIVE_RECEIVE_TIMEOUT); assertNull("Should not have received message as the selector was changed", rMsg); @@ -746,7 +826,7 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase * <li>create another durable subscriber with a selector and same name * <li>check first subscriber is now closed * <li>create a publisher and send messages - * <li>check messages are recieved correctly + * <li>check messages are received correctly * </ul> * <p> * QPID-2418 |