summaryrefslogtreecommitdiff
path: root/qpid/java/systests
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2010-09-05 18:50:31 +0000
committerRobert Gemmell <robbie@apache.org>2010-09-05 18:50:31 +0000
commitba143bd7b07e39ce07f1fb7bbf3cd107a515b469 (patch)
treea0907a624a16c0d6fb2d04f32a6da8133f6ba5f5 /qpid/java/systests
parente833204097e0df021fe9bc27785faa4769441d70 (diff)
downloadqpid-python-ba143bd7b07e39ce07f1fb7bbf3cd107a515b469.tar.gz
QPID-2418: Unsubscribe existing open durable subscriptions when changing subscription. Remove duplication in implementations.
Applied patch from Andrew Kennedy <andrew.international@gmail.com> git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@992855 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/systests')
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java10
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java71
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java272
3 files changed, 307 insertions, 46 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java
index cbc2078571..abb0781536 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java
@@ -21,6 +21,9 @@
package org.apache.qpid.server.queue;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
@@ -35,14 +38,11 @@ import junit.framework.Assert;
import org.apache.log4j.Logger;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQTopic;
-import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.locks.Condition;
-
public class TimeToLiveTest extends QpidBrokerTestCase
{
private static final Logger _logger = Logger.getLogger(TimeToLiveTest.class);
@@ -253,7 +253,7 @@ public class TimeToLiveTest extends QpidBrokerTestCase
producerSession.commit();
//resubscribe
- durableSubscriber = clientSession.createDurableSubscriber(topic, getTestQueueName());
+ durableSubscriber = clientSession.createDurableSubscriber(topic, getTestQueueName(),"testprop='TimeToLiveTest'", false);
// Ensure we sleep the required amount of time.
ReentrantLock waitLock = new ReentrantLock();
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java
index 3030572e13..989ac98747 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java
@@ -17,7 +17,17 @@
*/
package org.apache.qpid.test.unit.ct;
-import javax.jms.*;
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicConnection;
+import javax.jms.TopicConnectionFactory;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQQueue;
@@ -75,7 +85,7 @@ public class DurableSubscriberTest extends QpidBrokerTestCase
}
catch (Exception e)
{
- System.out.println("problems shutting down arjuna-ms");
+ _logger.error("problems restarting broker: " + e);
throw e;
}
//now recreate the durable subscriber and check the received messages
@@ -102,7 +112,7 @@ public class DurableSubscriberTest extends QpidBrokerTestCase
* create and register a durable subscriber with a message selector and then close it
* crash the broker
* create a publisher and send 5 right messages and 5 wrong messages
- * recreate the durable subscriber and check the received the 5 expected messages
+ * recreate the durable subscriber and check we receive the 5 expected messages
*/
public void testDurSubRestoresMessageSelector() throws Exception
{
@@ -125,7 +135,7 @@ public class DurableSubscriberTest extends QpidBrokerTestCase
}
catch (Exception e)
{
- System.out.println("problems shutting down arjuna-ms");
+ _logger.error("problems restarting broker: " + e);
throw e;
}
topic = (Topic) getInitialContext().lookup(_topicName);
@@ -148,7 +158,7 @@ public class DurableSubscriberTest extends QpidBrokerTestCase
//now recreate the durable subscriber and check the received messages
TopicConnection durConnection2 = factory.createTopicConnection("guest", "guest");
TopicSession durSession2 = durConnection2.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
- TopicSubscriber durSub2 = durSession2.createDurableSubscriber(topic, "dursub");
+ TopicSubscriber durSub2 = durSession2.createDurableSubscriber(topic, "dursub", "testprop='true'", false);
durConnection2.start();
for (int i = 0; i < 5; i++)
{
@@ -385,6 +395,10 @@ public class DurableSubscriberTest extends QpidBrokerTestCase
assertEquals("Content was wrong",
"testResubscribeWithChangedSelectorAndRestart1",
((TextMessage) rMsg).getText());
+
+ // Queue has no messages left
+ AMQQueue subQueueTmp = new AMQQueue("amq.topic", "clientid" + ":" + "testResubscribeWithChangedSelectorAndRestart");
+ assertEquals("Msg count should be 0", 0, ((AMQSession<?, ?>) session).getQueueDepth(subQueueTmp));
rMsg = subA.receive(1000);
assertNull(rMsg);
@@ -403,14 +417,14 @@ public class DurableSubscriberTest extends QpidBrokerTestCase
// Reconnect with new selector that matches B
TopicSubscriber subB = session.createDurableSubscriber(topic,
- "testResubscribeWithChangedSelectorAndRestart","Match = False", false);
+ "testResubscribeWithChangedSelectorAndRestart",
+ "Match = false", false);
//verify no messages are now present on the queue as changing selector should have issued
//an unsubscribe and thus deleted the previous durable backing queue for the subscription.
- //check the dur sub's underlying queue now has msg count 1
- AMQQueue subQueue = new AMQQueue("amq.topic", "clientid" + ":" + "testResubscribeWithChangedSelector");
- assertEquals("Msg count should be 0", 0, ((AMQSession)session).getQueueDepth(subQueue));
-
+ //check the dur sub's underlying queue now has msg count 0
+ AMQQueue subQueue = new AMQQueue("amq.topic", "clientid" + ":" + "testResubscribeWithChangedSelectorAndRestart");
+ assertEquals("Msg count should be 0", 0, ((AMQSession<?, ?>) session).getQueueDepth(subQueue));
// Check that new messages are received properly
msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart1");
@@ -429,6 +443,10 @@ public class DurableSubscriberTest extends QpidBrokerTestCase
rMsg = subB.receive(1000);
assertNull(rMsg);
+ //check the dur sub's underlying queue now has msg count 0
+ subQueue = new AMQQueue("amq.topic", "clientid" + ":" + "testResubscribeWithChangedSelectorAndRestart");
+ assertEquals("Msg count should be 0", 0, ((AMQSession<?, ?>) session).getQueueDepth(subQueue));
+
//now restart the server
try
{
@@ -440,28 +458,49 @@ public class DurableSubscriberTest extends QpidBrokerTestCase
throw e;
}
- // Check that new messages are still received properly
+ // Reconnect to broker
+ Connection connection = getConnectionFactory().createConnection("guest", "guest");
+ connection.start();
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ topic = new AMQTopic((AMQConnection) connection, "testResubscribeWithChangedSelectorAndRestart");
+ producer = session.createProducer(topic);
+
+ //verify no messages now present on the queue after we restart the broker
+ //check the dur sub's underlying queue now has msg count 0
+ subQueue = new AMQQueue("amq.topic", "clientid" + ":" + "testResubscribeWithChangedSelectorAndRestart");
+ assertEquals("Msg count should be 0", 0, ((AMQSession<?, ?>) session).getQueueDepth(subQueue));
+
+ // Reconnect with new selector that matches B
+ TopicSubscriber subC = session.createDurableSubscriber(topic,
+ "testResubscribeWithChangedSelectorAndRestart",
+ "Match = False", false);
+
+ // Check that new messages are still sent and recieved properly
msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart1");
msg.setBooleanProperty("Match", true);
producer.send(msg);
msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart2");
msg.setBooleanProperty("Match", false);
producer.send(msg);
+
+ //check the dur sub's underlying queue now has msg count 1
+ subQueue = new AMQQueue("amq.topic", "clientid" + ":" + "testResubscribeWithChangedSelectorAndRestart");
+ assertEquals("Msg count should be 1", 1, ((AMQSession<?, ?>) session).getQueueDepth(subQueue));
- rMsg = subB.receive(1000);
+ rMsg = subC.receive(1000);
assertNotNull(rMsg);
assertEquals("Content was wrong",
"testResubscribeWithChangedSelectorAndRestart2",
((TextMessage) rMsg).getText());
- rMsg = subB.receive(1000);
+ rMsg = subC.receive(1000);
assertNull(rMsg);
session.unsubscribe("testResubscribeWithChangedSelectorAndRestart");
- subB.close();
+
+ subC.close();
session.close();
- conn.close();
+ connection.close();
}
-
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
index d1132c14fb..3dd3c72024 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
@@ -23,16 +23,6 @@ package org.apache.qpid.test.unit.topic;
import java.io.IOException;
import java.util.Set;
-import org.apache.qpid.management.common.JMXConnnectionFactory;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.AMQTopic;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import javax.jms.Connection;
import javax.jms.InvalidDestinationException;
import javax.jms.InvalidSelectorException;
@@ -48,6 +38,15 @@ import javax.management.MBeanServerConnection;
import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.management.common.JMXConnnectionFactory;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* @todo Code to check that a consumer gets only one particular method could be factored into a re-usable method (as
* a static on a base test helper class, e.g. TestUtils.
@@ -118,11 +117,11 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase
_logger.info("Producer sending message A");
producer.send(session1.createTextMessage("A"));
- ((AMQSession)session1).sync();
+ ((AMQSession<?, ?>) session1).sync();
//check the dur sub's underlying queue now has msg count 1
AMQQueue subQueue = new AMQQueue("amq.topic", "clientid" + ":" + "MySubscription");
- assertEquals("Msg count should be 1", 1, ((AMQSession)session1).getQueueDepth(subQueue));
+ assertEquals("Msg count should be 1", 1, ((AMQSession<?, ?>) session1).getQueueDepth(subQueue));
Message msg;
_logger.info("Receive message on consumer 1:expecting A");
@@ -141,16 +140,16 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase
_logger.info("Receive message on consumer 1 :expecting null");
assertEquals(null, msg);
- ((AMQSession)session2).sync();
+ ((AMQSession<?, ?>) session2).sync();
//check the dur sub's underlying queue now has msg count 0
- assertEquals("Msg count should be 0", 0, ((AMQSession)session2).getQueueDepth(subQueue));
+ assertEquals("Msg count should be 0", 0, ((AMQSession<?, ?>) session2).getQueueDepth(subQueue));
consumer2.close();
_logger.info("Unsubscribe session2/consumer2");
session2.unsubscribe("MySubscription");
- ((AMQSession)session2).sync();
+ ((AMQSession<?, ?>) session2).sync();
if(isJavaBroker() && isExternalBroker())
{
@@ -435,7 +434,7 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase
con3.close();
}
- /***
+ /**
* This tests the fix for QPID-1085
* Creates a durable subscriber with an invalid selector, checks that the
* exception is thrown correctly and that the subscription is not created.
@@ -472,7 +471,7 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase
session.unsubscribe("testDurableWithInvalidSelectorSub");
}
- /***
+ /**
* This tests the fix for QPID-1085
* Creates a durable subscriber with an invalid destination, checks that the
* exception is thrown correctly and that the subscription is not created.
@@ -509,9 +508,9 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase
}
/**
- * Tests QPID-1202
* Creates a durable subscription with a selector, then changes that selector on resubscription
- * @throws Exception
+ * <p>
+ * QPID-1202, QPID-2418
*/
public void testResubscribeWithChangedSelector() throws Exception
{
@@ -544,8 +543,14 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase
// Reconnect with new selector that matches B
TopicSubscriber subB = session.createDurableSubscriber(topic,
"testResubscribeWithChangedSelector","Match = False", false);
-
- // Check messages are received properly
+
+ //verify no messages are now present as changing selector should have issued
+ //an unsubscribe and thus deleted the previous backing queue for the subscription.
+ rMsg = subB.receive(NEGATIVE_RECEIVE_TIMEOUT);
+ assertNull("Should not have received message as the queue underlying the " +
+ "subscription should have been cleared/deleted when the selector was changed", rMsg);
+
+ // Check that new messages are received properly
sendMatchingAndNonMatchingMessage(session, producer);
rMsg = subB.receive(POSITIVE_RECEIVE_TIMEOUT);
@@ -594,9 +599,226 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase
msg.setBooleanProperty("Match", false);
producer.send(msg);
}
-
- public static junit.framework.Test suite()
+
+
+ /**
+ * create and register a durable subscriber with a message selector and then close it
+ * create a publisher and send 5 right messages and 5 wrong messages
+ * create another durable subscriber with the same selector and name
+ * check messages are still there
+ * <p>
+ * QPID-2418
+ */
+ public void testDurSubSameMessageSelector() throws Exception
+ {
+ Connection conn = getConnection();
+ conn.start();
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ AMQTopic topic = new AMQTopic((AMQConnection) conn, "sameMessageSelector");
+
+ //create and register a durable subscriber with a message selector and then close it
+ TopicSubscriber subOne = session.createDurableSubscriber(topic, "sameMessageSelector", "testprop = TRUE", false);
+ subOne.close();
+
+ MessageProducer producer = session.createProducer(topic);
+ for (int i = 0; i < 5; i++)
+ {
+ Message message = session.createMessage();
+ message.setBooleanProperty("testprop", true);
+ producer.send(message);
+ message = session.createMessage();
+ message.setBooleanProperty("testprop", false);
+ producer.send(message);
+ }
+ producer.close();
+
+ // now recreate the durable subscriber and check the received messages
+ TopicSubscriber subTwo = session.createDurableSubscriber(topic, "sameMessageSelector", "testprop = TRUE", false);
+
+ // should be 5 messages on queue now
+ AMQQueue queue = new AMQQueue("amq.topic", "clientid" + ":" + "sameMessageSelector");
+ assertEquals("Queue depth is wrong", 5, ((AMQSession<?, ?>) session).getQueueDepth(queue));
+
+ for (int i = 0; i < 5; i++)
+ {
+ Message message = subTwo.receive(1000);
+ if (message == null)
+ {
+ fail("sameMessageSelector test failed. no message was returned");
+ }
+ else
+ {
+ assertEquals("sameMessageSelector test failed. message selector not reset",
+ "true", message.getStringProperty("testprop"));
+ }
+ }
+
+ // Check queue has no messages
+ assertEquals("Queue should be empty", 0, ((AMQSession<?, ?>) session).getQueueDepth(queue));
+
+ // Unsubscribe
+ session.unsubscribe("sameMessageSelector");
+
+ conn.close();
+ }
+
+ /**
+ * <ul>
+ * <li>create and register a durable subscriber with a message selector
+ * <li>create another durable subscriber with a different selector and same name
+ * <li>check first subscriber is now closed
+ * <li>create a publisher and send messages
+ * <li>check messages are recieved correctly
+ * </ul>
+ * <p>
+ * QPID-2418
+ */
+ public void testResubscribeWithChangedSelectorNoClose() throws Exception
{
- return new junit.framework.TestSuite(DurableSubscriptionTest.class);
- }
+ Connection conn = getConnection();
+ conn.start();
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ AMQTopic topic = new AMQTopic((AMQConnection) conn, "testResubscribeWithChangedSelectorNoClose");
+
+ // Create durable subscriber that matches A
+ TopicSubscriber subA = session.createDurableSubscriber(topic,
+ "testResubscribeWithChangedSelectorNoClose",
+ "Match = True", false);
+
+ // Reconnect with new selector that matches B
+ TopicSubscriber subB = session.createDurableSubscriber(topic,
+ "testResubscribeWithChangedSelectorNoClose",
+ "Match = false", false);
+
+ // First subscription has been closed
+ try
+ {
+ subA.receive(1000);
+ fail("First subscription was not closed");
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+
+ // Send 1 matching message and 1 non-matching message
+ MessageProducer producer = session.createProducer(topic);
+ TextMessage msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart1");
+ msg.setBooleanProperty("Match", true);
+ producer.send(msg);
+ msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart2");
+ msg.setBooleanProperty("Match", false);
+ producer.send(msg);
+
+ // should be 1 message on queue now
+ AMQQueue queue = new AMQQueue("amq.topic", "clientid" + ":" + "testResubscribeWithChangedSelectorNoClose");
+ assertEquals("Queue depth is wrong", 1, ((AMQSession<?, ?>) session).getQueueDepth(queue));
+
+ Message rMsg = subB.receive(1000);
+ assertNotNull(rMsg);
+ assertEquals("Content was wrong",
+ "testResubscribeWithChangedSelectorAndRestart2",
+ ((TextMessage) rMsg).getText());
+
+ rMsg = subB.receive(1000);
+ assertNull(rMsg);
+
+ // Check queue has no messages
+ assertEquals("Queue should be empty", 0, ((AMQSession<?, ?>) session).getQueueDepth(queue));
+
+ conn.close();
+ }
+
+ /**
+ * <ul>
+ * <li>create and register a durable subscriber with no message selector
+ * <li>create another durable subscriber with a selector and same name
+ * <li>check first subscriber is now closed
+ * <li>create a publisher and send messages
+ * <li>check messages are recieved correctly
+ * </ul>
+ * <p>
+ * QPID-2418
+ */
+ public void testDurSubAddMessageSelectorNoClose() throws Exception
+ {
+ Connection conn = getConnection();
+ conn.start();
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ AMQTopic topic = new AMQTopic((AMQConnection) conn, "subscriptionName");
+
+ // create and register a durable subscriber with no message selector
+ TopicSubscriber subOne = session.createDurableSubscriber(topic, "subscriptionName", null, false);
+
+ // now create a durable subscriber with a selector
+ TopicSubscriber subTwo = session.createDurableSubscriber(topic, "subscriptionName", "testprop = TRUE", false);
+
+ // First subscription has been closed
+ try
+ {
+ subOne.receive(1000);
+ fail("First subscription was not closed");
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+
+ // Send 1 matching message and 1 non-matching message
+ MessageProducer producer = session.createProducer(topic);
+ TextMessage msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart1");
+ msg.setBooleanProperty("testprop", true);
+ producer.send(msg);
+ msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart2");
+ msg.setBooleanProperty("testprop", false);
+ producer.send(msg);
+
+ // should be 1 message on queue now
+ AMQQueue queue = new AMQQueue("amq.topic", "clientid" + ":" + "subscriptionName");
+ assertEquals("Queue depth is wrong", 1, ((AMQSession<?, ?>) session).getQueueDepth(queue));
+
+ Message rMsg = subTwo.receive(1000);
+ assertNotNull(rMsg);
+ assertEquals("Content was wrong",
+ "testResubscribeWithChangedSelectorAndRestart1",
+ ((TextMessage) rMsg).getText());
+
+ rMsg = subTwo.receive(1000);
+ assertNull(rMsg);
+
+ // Check queue has no messages
+ assertEquals("Queue should be empty", 0, ((AMQSession<?, ?>) session).getQueueDepth(queue));
+
+ conn.close();
+ }
+
+ /**
+ * <ul>
+ * <li>create and register a durable subscriber with no message selector
+ * <li>try to create another durable with the same name, should fail
+ * </ul>
+ * <p>
+ * QPID-2418
+ */
+ public void testDurSubNoSelectorResubscribeNoClose() throws Exception
+ {
+ Connection conn = getConnection();
+ conn.start();
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ AMQTopic topic = new AMQTopic((AMQConnection) conn, "subscriptionName");
+
+ // create and register a durable subscriber with no message selector
+ session.createDurableSubscriber(topic, "subscriptionName", null, false);
+
+ // try to recreate the durable subscriber
+ try
+ {
+ session.createDurableSubscriber(topic, "subscriptionName", null, false);
+ fail("Subscription should not have been created");
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
}