summaryrefslogtreecommitdiff
path: root/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java')
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java146
1 files changed, 113 insertions, 33 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
index a9ac028af6..4b61b6269c 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
@@ -24,6 +24,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQNoRouteException;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQTopic;
@@ -31,6 +32,7 @@ import org.apache.qpid.management.common.JMXConnnectionFactory;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
import javax.jms.Connection;
+import javax.jms.ExceptionListener;
import javax.jms.InvalidDestinationException;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
@@ -46,6 +48,8 @@ import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
import java.io.IOException;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
/**
* @todo Code to check that a consumer gets only one particular method could be factored into a re-usable method (as
@@ -58,7 +62,11 @@ import java.util.Set;
public class DurableSubscriptionTest extends QpidBrokerTestCase
{
private static final Logger _logger = LoggerFactory.getLogger(DurableSubscriptionTest.class);
-
+
+ private static final String MY_TOPIC = "MyTopic";
+
+ private static final String MY_SUBSCRIPTION = "MySubscription";
+
/** Timeout for receive() if we are expecting a message */
private static final long POSITIVE_RECEIVE_TIMEOUT = 2000;
@@ -80,24 +88,29 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase
public void tearDown() throws Exception
{
- if(_jmxConnected)
+ try
{
- try
+ if(_jmxConnected)
{
- _jmxc.close();
- }
- catch (IOException e)
- {
- e.printStackTrace();
+ try
+ {
+ _jmxc.close();
+ }
+ catch (IOException e)
+ {
+ _logger.error("Error closing JMX connection", e);
+ }
}
}
-
- super.tearDown();
+ finally
+ {
+ super.tearDown();
+ }
}
-
+
public void testUnsubscribe() throws Exception
{
- AMQConnection con = (AMQConnection) getConnection("guest", "guest");
+ AMQConnection con = (AMQConnection) getConnection();
AMQTopic topic = new AMQTopic(con, "MyDurableSubscriptionTestTopic");
_logger.info("Create Session 1");
Session session1 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
@@ -109,7 +122,7 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase
_logger.info("Create Session 2");
Session session2 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
_logger.info("Create Durable Subscriber on Session 2");
- TopicSubscriber consumer2 = session2.createDurableSubscriber(topic, "MySubscription");
+ TopicSubscriber consumer2 = session2.createDurableSubscriber(topic, MY_SUBSCRIPTION);
_logger.info("Starting connection");
con.start();
@@ -118,7 +131,7 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase
producer.send(session1.createTextMessage("A"));
//check the dur sub's underlying queue now has msg count 1
- AMQQueue subQueue = new AMQQueue("amq.topic", "clientid" + ":" + "MySubscription");
+ AMQQueue subQueue = new AMQQueue("amq.topic", "clientid" + ":" + MY_SUBSCRIPTION);
assertEquals("Msg count should be 1", 1, ((AMQSession<?, ?>) session1).getQueueDepth(subQueue, true));
Message msg;
@@ -143,7 +156,7 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase
consumer2.close();
_logger.info("Unsubscribe session2/consumer2");
- session2.unsubscribe("MySubscription");
+ session2.unsubscribe(MY_SUBSCRIPTION);
((AMQSession<?, ?>) session2).sync();
@@ -157,7 +170,7 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase
_mbsc = _jmxc.getMBeanServerConnection();
//must replace the occurrence of ':' in queue name with '-'
- String queueObjectNameText = "clientid" + "-" + "MySubscription";
+ String queueObjectNameText = "clientid" + "-" + MY_SUBSCRIPTION;
ObjectName objName = new ObjectName("org.apache.qpid:type=VirtualHost.Queue,name="
+ queueObjectNameText + ",*");
@@ -189,7 +202,74 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase
_logger.info("Close connection");
con.close();
}
-
+
+
+ /**
+ * Specifically uses a subscriber with a selector because QPID-4731 found that selectors
+ * can prevent queue removal.
+ */
+ public void testUnsubscribeWhenUsingSelectorMakesTopicUnreachable() throws Exception
+ {
+ setTestClientSystemProperty("qpid.default_mandatory_topic","true");
+
+ // set up subscription
+ AMQConnection connection = (AMQConnection) getConnection();
+ connection.start();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Topic topic = new AMQTopic(connection, MY_TOPIC);
+ MessageProducer producer = session.createProducer(topic);
+
+ TopicSubscriber subscriber = session.createDurableSubscriber(topic, MY_SUBSCRIPTION, "1 = 1", false);
+ StoringExceptionListener exceptionListener = new StoringExceptionListener();
+ connection.setExceptionListener(exceptionListener);
+
+ // send message and verify it was consumed
+ producer.send(session.createTextMessage("message1"));
+ assertNotNull("Message should have been successfully received", subscriber.receive(POSITIVE_RECEIVE_TIMEOUT));
+ assertEquals(null, exceptionListener.getException());
+ session.unsubscribe(MY_SUBSCRIPTION);
+
+ // send another message and verify that the connection exception listener was fired.
+ StoringExceptionListener exceptionListener2 = new StoringExceptionListener();
+ connection.setExceptionListener(exceptionListener2);
+
+ producer.send(session.createTextMessage("message that should be unroutable"));
+ ((AMQSession<?, ?>) session).sync();
+
+ JMSException exception = exceptionListener2.awaitException();
+ assertNotNull("Expected exception as message should no longer be routable", exception);
+
+ Throwable linkedException = exception.getLinkedException();
+ assertNotNull("The linked exception of " + exception + " should be the 'no route' exception", linkedException);
+ assertEquals(AMQNoRouteException.class, linkedException.getClass());
+ }
+
+ private final class StoringExceptionListener implements ExceptionListener
+ {
+ private volatile JMSException _exception;
+ private CountDownLatch _latch = new CountDownLatch(1);
+
+ @Override
+ public void onException(JMSException exception)
+ {
+ _exception = exception;
+ _logger.info("Exception listener received: " + exception);
+ _latch.countDown();
+ }
+
+ public JMSException awaitException() throws InterruptedException
+ {
+ _latch.await(POSITIVE_RECEIVE_TIMEOUT, TimeUnit.MILLISECONDS);
+ return _exception;
+ }
+
+ public JMSException getException()
+ {
+ return _exception;
+ }
+ }
+
public void testDurabilityNOACK() throws Exception
{
durabilityImpl(AMQSession.NO_ACKNOWLEDGE, false);
@@ -223,8 +303,8 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase
private void durabilityImpl(int ackMode, boolean restartBroker) throws Exception
{
- AMQConnection con = (AMQConnection) getConnection("guest", "guest");
- AMQTopic topic = new AMQTopic(con, "MyTopic");
+ AMQConnection con = (AMQConnection) getConnection();
+ AMQTopic topic = new AMQTopic(con, MY_TOPIC);
Session session1 = con.createSession(false, ackMode);
MessageConsumer consumer1 = session1.createConsumer(topic);
@@ -232,7 +312,7 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase
MessageProducer producer = sessionProd.createProducer(topic);
Session session2 = con.createSession(false, ackMode);
- TopicSubscriber consumer2 = session2.createDurableSubscriber(topic, "MySubscription");
+ TopicSubscriber consumer2 = session2.createDurableSubscriber(topic, MY_SUBSCRIPTION);
con.start();
@@ -267,13 +347,13 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase
consumer2.close();
session2.close();
-
+
//Send message C, then connect consumer 3 to durable subscription and get
//message B if not using NO_ACK, then receive C with consumer 1 and 3
producer.send(session1.createTextMessage("C"));
Session session3 = con.createSession(false, ackMode);
- MessageConsumer consumer3 = session3.createDurableSubscriber(topic, "MySubscription");
+ MessageConsumer consumer3 = session3.createDurableSubscriber(topic, MY_SUBSCRIPTION);
if(ackMode == AMQSession.NO_ACKNOWLEDGE)
{
@@ -307,7 +387,7 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase
consumer1.close();
consumer3.close();
- session3.unsubscribe("MySubscription");
+ session3.unsubscribe(MY_SUBSCRIPTION);
con.close();
@@ -328,28 +408,28 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase
{
Message msg;
// Create producer.
- AMQConnection con0 = (AMQConnection) getConnection("guest", "guest");
+ AMQConnection con0 = (AMQConnection) getConnection();
con0.start();
Session session0 = con0.createSession(false, ackMode);
- AMQTopic topic = new AMQTopic(con0, "MyTopic");
+ AMQTopic topic = new AMQTopic(con0, MY_TOPIC);
Session sessionProd = con0.createSession(false, ackMode);
MessageProducer producer = sessionProd.createProducer(topic);
// Create consumer 1.
- AMQConnection con1 = (AMQConnection) getConnection("guest", "guest");
+ AMQConnection con1 = (AMQConnection) getConnection();
con1.start();
Session session1 = con1.createSession(false, ackMode);
MessageConsumer consumer1 = session1.createConsumer(topic);
// Create consumer 2.
- AMQConnection con2 = (AMQConnection) getConnection("guest", "guest");
+ AMQConnection con2 = (AMQConnection) getConnection();
con2.start();
Session session2 = con2.createSession(false, ackMode);
- TopicSubscriber consumer2 = session2.createDurableSubscriber(topic, "MySubscription");
+ TopicSubscriber consumer2 = session2.createDurableSubscriber(topic, MY_SUBSCRIPTION);
// Send message and check that both consumers get it and only it.
producer.send(session0.createTextMessage("A"));
@@ -393,11 +473,11 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase
// Re-attach a new consumer to the durable subscription, and check that it gets message B it left (if not NO_ACK)
// and also gets message C sent after it was disconnected.
- AMQConnection con3 = (AMQConnection) getConnection("guest", "guest");
+ AMQConnection con3 = (AMQConnection) getConnection();
con3.start();
Session session3 = con3.createSession(false, ackMode);
- TopicSubscriber consumer3 = session3.createDurableSubscriber(topic, "MySubscription");
+ TopicSubscriber consumer3 = session3.createDurableSubscriber(topic, MY_SUBSCRIPTION);
if(ackMode == AMQSession.NO_ACKNOWLEDGE)
{
@@ -423,7 +503,7 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase
consumer1.close();
consumer3.close();
- session3.unsubscribe("MySubscription");
+ session3.unsubscribe(MY_SUBSCRIPTION);
con0.close();
con1.close();
@@ -540,7 +620,7 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase
TopicSubscriber subB = session.createDurableSubscriber(topic,
"testResubscribeWithChangedSelector","Match = False", false);
- //verify no messages are now recieved.
+ //verify no messages are now received.
rMsg = subB.receive(NEGATIVE_RECEIVE_TIMEOUT);
assertNull("Should not have received message as the selector was changed", rMsg);
@@ -746,7 +826,7 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase
* <li>create another durable subscriber with a selector and same name
* <li>check first subscriber is now closed
* <li>create a publisher and send messages
- * <li>check messages are recieved correctly
+ * <li>check messages are received correctly
* </ul>
* <p>
* QPID-2418