diff options
author | Robert Gemmell <robbie@apache.org> | 2010-09-05 18:50:31 +0000 |
---|---|---|
committer | Robert Gemmell <robbie@apache.org> | 2010-09-05 18:50:31 +0000 |
commit | ba143bd7b07e39ce07f1fb7bbf3cd107a515b469 (patch) | |
tree | a0907a624a16c0d6fb2d04f32a6da8133f6ba5f5 /qpid/java/systests | |
parent | e833204097e0df021fe9bc27785faa4769441d70 (diff) | |
download | qpid-python-ba143bd7b07e39ce07f1fb7bbf3cd107a515b469.tar.gz |
QPID-2418: Unsubscribe existing open durable subscriptions when changing subscription. Remove duplication in implementations.
Applied patch from Andrew Kennedy <andrew.international@gmail.com>
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@992855 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/systests')
3 files changed, 307 insertions, 46 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java index cbc2078571..abb0781536 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java @@ -21,6 +21,9 @@ package org.apache.qpid.server.queue; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; @@ -35,14 +38,11 @@ import junit.framework.Assert; import org.apache.log4j.Logger; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.AMQTopic; -import org.apache.qpid.client.AMQQueue; import org.apache.qpid.test.utils.QpidBrokerTestCase; -import java.util.concurrent.locks.ReentrantLock; -import java.util.concurrent.locks.Condition; - public class TimeToLiveTest extends QpidBrokerTestCase { private static final Logger _logger = Logger.getLogger(TimeToLiveTest.class); @@ -253,7 +253,7 @@ public class TimeToLiveTest extends QpidBrokerTestCase producerSession.commit(); //resubscribe - durableSubscriber = clientSession.createDurableSubscriber(topic, getTestQueueName()); + durableSubscriber = clientSession.createDurableSubscriber(topic, getTestQueueName(),"testprop='TimeToLiveTest'", false); // Ensure we sleep the required amount of time. ReentrantLock waitLock = new ReentrantLock(); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java index 3030572e13..989ac98747 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java @@ -17,7 +17,17 @@ */ package org.apache.qpid.test.unit.ct; -import javax.jms.*; +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; +import javax.jms.TopicConnection; +import javax.jms.TopicConnectionFactory; +import javax.jms.TopicPublisher; +import javax.jms.TopicSession; +import javax.jms.TopicSubscriber; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQQueue; @@ -75,7 +85,7 @@ public class DurableSubscriberTest extends QpidBrokerTestCase } catch (Exception e) { - System.out.println("problems shutting down arjuna-ms"); + _logger.error("problems restarting broker: " + e); throw e; } //now recreate the durable subscriber and check the received messages @@ -102,7 +112,7 @@ public class DurableSubscriberTest extends QpidBrokerTestCase * create and register a durable subscriber with a message selector and then close it * crash the broker * create a publisher and send 5 right messages and 5 wrong messages - * recreate the durable subscriber and check the received the 5 expected messages + * recreate the durable subscriber and check we receive the 5 expected messages */ public void testDurSubRestoresMessageSelector() throws Exception { @@ -125,7 +135,7 @@ public class DurableSubscriberTest extends QpidBrokerTestCase } catch (Exception e) { - System.out.println("problems shutting down arjuna-ms"); + _logger.error("problems restarting broker: " + e); throw e; } topic = (Topic) getInitialContext().lookup(_topicName); @@ -148,7 +158,7 @@ public class DurableSubscriberTest extends QpidBrokerTestCase //now recreate the durable subscriber and check the received messages TopicConnection durConnection2 = factory.createTopicConnection("guest", "guest"); TopicSession durSession2 = durConnection2.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); - TopicSubscriber durSub2 = durSession2.createDurableSubscriber(topic, "dursub"); + TopicSubscriber durSub2 = durSession2.createDurableSubscriber(topic, "dursub", "testprop='true'", false); durConnection2.start(); for (int i = 0; i < 5; i++) { @@ -385,6 +395,10 @@ public class DurableSubscriberTest extends QpidBrokerTestCase assertEquals("Content was wrong", "testResubscribeWithChangedSelectorAndRestart1", ((TextMessage) rMsg).getText()); + + // Queue has no messages left + AMQQueue subQueueTmp = new AMQQueue("amq.topic", "clientid" + ":" + "testResubscribeWithChangedSelectorAndRestart"); + assertEquals("Msg count should be 0", 0, ((AMQSession<?, ?>) session).getQueueDepth(subQueueTmp)); rMsg = subA.receive(1000); assertNull(rMsg); @@ -403,14 +417,14 @@ public class DurableSubscriberTest extends QpidBrokerTestCase // Reconnect with new selector that matches B TopicSubscriber subB = session.createDurableSubscriber(topic, - "testResubscribeWithChangedSelectorAndRestart","Match = False", false); + "testResubscribeWithChangedSelectorAndRestart", + "Match = false", false); //verify no messages are now present on the queue as changing selector should have issued //an unsubscribe and thus deleted the previous durable backing queue for the subscription. - //check the dur sub's underlying queue now has msg count 1 - AMQQueue subQueue = new AMQQueue("amq.topic", "clientid" + ":" + "testResubscribeWithChangedSelector"); - assertEquals("Msg count should be 0", 0, ((AMQSession)session).getQueueDepth(subQueue)); - + //check the dur sub's underlying queue now has msg count 0 + AMQQueue subQueue = new AMQQueue("amq.topic", "clientid" + ":" + "testResubscribeWithChangedSelectorAndRestart"); + assertEquals("Msg count should be 0", 0, ((AMQSession<?, ?>) session).getQueueDepth(subQueue)); // Check that new messages are received properly msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart1"); @@ -429,6 +443,10 @@ public class DurableSubscriberTest extends QpidBrokerTestCase rMsg = subB.receive(1000); assertNull(rMsg); + //check the dur sub's underlying queue now has msg count 0 + subQueue = new AMQQueue("amq.topic", "clientid" + ":" + "testResubscribeWithChangedSelectorAndRestart"); + assertEquals("Msg count should be 0", 0, ((AMQSession<?, ?>) session).getQueueDepth(subQueue)); + //now restart the server try { @@ -440,28 +458,49 @@ public class DurableSubscriberTest extends QpidBrokerTestCase throw e; } - // Check that new messages are still received properly + // Reconnect to broker + Connection connection = getConnectionFactory().createConnection("guest", "guest"); + connection.start(); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + topic = new AMQTopic((AMQConnection) connection, "testResubscribeWithChangedSelectorAndRestart"); + producer = session.createProducer(topic); + + //verify no messages now present on the queue after we restart the broker + //check the dur sub's underlying queue now has msg count 0 + subQueue = new AMQQueue("amq.topic", "clientid" + ":" + "testResubscribeWithChangedSelectorAndRestart"); + assertEquals("Msg count should be 0", 0, ((AMQSession<?, ?>) session).getQueueDepth(subQueue)); + + // Reconnect with new selector that matches B + TopicSubscriber subC = session.createDurableSubscriber(topic, + "testResubscribeWithChangedSelectorAndRestart", + "Match = False", false); + + // Check that new messages are still sent and recieved properly msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart1"); msg.setBooleanProperty("Match", true); producer.send(msg); msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart2"); msg.setBooleanProperty("Match", false); producer.send(msg); + + //check the dur sub's underlying queue now has msg count 1 + subQueue = new AMQQueue("amq.topic", "clientid" + ":" + "testResubscribeWithChangedSelectorAndRestart"); + assertEquals("Msg count should be 1", 1, ((AMQSession<?, ?>) session).getQueueDepth(subQueue)); - rMsg = subB.receive(1000); + rMsg = subC.receive(1000); assertNotNull(rMsg); assertEquals("Content was wrong", "testResubscribeWithChangedSelectorAndRestart2", ((TextMessage) rMsg).getText()); - rMsg = subB.receive(1000); + rMsg = subC.receive(1000); assertNull(rMsg); session.unsubscribe("testResubscribeWithChangedSelectorAndRestart"); - subB.close(); + + subC.close(); session.close(); - conn.close(); + connection.close(); } - } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java index d1132c14fb..3dd3c72024 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java @@ -23,16 +23,6 @@ package org.apache.qpid.test.unit.topic; import java.io.IOException; import java.util.Set; -import org.apache.qpid.management.common.JMXConnnectionFactory; -import org.apache.qpid.test.utils.QpidBrokerTestCase; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.client.AMQTopic; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import javax.jms.Connection; import javax.jms.InvalidDestinationException; import javax.jms.InvalidSelectorException; @@ -48,6 +38,15 @@ import javax.management.MBeanServerConnection; import javax.management.ObjectName; import javax.management.remote.JMXConnector; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.management.common.JMXConnnectionFactory; +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * @todo Code to check that a consumer gets only one particular method could be factored into a re-usable method (as * a static on a base test helper class, e.g. TestUtils. @@ -118,11 +117,11 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase _logger.info("Producer sending message A"); producer.send(session1.createTextMessage("A")); - ((AMQSession)session1).sync(); + ((AMQSession<?, ?>) session1).sync(); //check the dur sub's underlying queue now has msg count 1 AMQQueue subQueue = new AMQQueue("amq.topic", "clientid" + ":" + "MySubscription"); - assertEquals("Msg count should be 1", 1, ((AMQSession)session1).getQueueDepth(subQueue)); + assertEquals("Msg count should be 1", 1, ((AMQSession<?, ?>) session1).getQueueDepth(subQueue)); Message msg; _logger.info("Receive message on consumer 1:expecting A"); @@ -141,16 +140,16 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase _logger.info("Receive message on consumer 1 :expecting null"); assertEquals(null, msg); - ((AMQSession)session2).sync(); + ((AMQSession<?, ?>) session2).sync(); //check the dur sub's underlying queue now has msg count 0 - assertEquals("Msg count should be 0", 0, ((AMQSession)session2).getQueueDepth(subQueue)); + assertEquals("Msg count should be 0", 0, ((AMQSession<?, ?>) session2).getQueueDepth(subQueue)); consumer2.close(); _logger.info("Unsubscribe session2/consumer2"); session2.unsubscribe("MySubscription"); - ((AMQSession)session2).sync(); + ((AMQSession<?, ?>) session2).sync(); if(isJavaBroker() && isExternalBroker()) { @@ -435,7 +434,7 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase con3.close(); } - /*** + /** * This tests the fix for QPID-1085 * Creates a durable subscriber with an invalid selector, checks that the * exception is thrown correctly and that the subscription is not created. @@ -472,7 +471,7 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase session.unsubscribe("testDurableWithInvalidSelectorSub"); } - /*** + /** * This tests the fix for QPID-1085 * Creates a durable subscriber with an invalid destination, checks that the * exception is thrown correctly and that the subscription is not created. @@ -509,9 +508,9 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase } /** - * Tests QPID-1202 * Creates a durable subscription with a selector, then changes that selector on resubscription - * @throws Exception + * <p> + * QPID-1202, QPID-2418 */ public void testResubscribeWithChangedSelector() throws Exception { @@ -544,8 +543,14 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase // Reconnect with new selector that matches B TopicSubscriber subB = session.createDurableSubscriber(topic, "testResubscribeWithChangedSelector","Match = False", false); - - // Check messages are received properly + + //verify no messages are now present as changing selector should have issued + //an unsubscribe and thus deleted the previous backing queue for the subscription. + rMsg = subB.receive(NEGATIVE_RECEIVE_TIMEOUT); + assertNull("Should not have received message as the queue underlying the " + + "subscription should have been cleared/deleted when the selector was changed", rMsg); + + // Check that new messages are received properly sendMatchingAndNonMatchingMessage(session, producer); rMsg = subB.receive(POSITIVE_RECEIVE_TIMEOUT); @@ -594,9 +599,226 @@ public class DurableSubscriptionTest extends QpidBrokerTestCase msg.setBooleanProperty("Match", false); producer.send(msg); } - - public static junit.framework.Test suite() + + + /** + * create and register a durable subscriber with a message selector and then close it + * create a publisher and send 5 right messages and 5 wrong messages + * create another durable subscriber with the same selector and name + * check messages are still there + * <p> + * QPID-2418 + */ + public void testDurSubSameMessageSelector() throws Exception + { + Connection conn = getConnection(); + conn.start(); + Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + AMQTopic topic = new AMQTopic((AMQConnection) conn, "sameMessageSelector"); + + //create and register a durable subscriber with a message selector and then close it + TopicSubscriber subOne = session.createDurableSubscriber(topic, "sameMessageSelector", "testprop = TRUE", false); + subOne.close(); + + MessageProducer producer = session.createProducer(topic); + for (int i = 0; i < 5; i++) + { + Message message = session.createMessage(); + message.setBooleanProperty("testprop", true); + producer.send(message); + message = session.createMessage(); + message.setBooleanProperty("testprop", false); + producer.send(message); + } + producer.close(); + + // now recreate the durable subscriber and check the received messages + TopicSubscriber subTwo = session.createDurableSubscriber(topic, "sameMessageSelector", "testprop = TRUE", false); + + // should be 5 messages on queue now + AMQQueue queue = new AMQQueue("amq.topic", "clientid" + ":" + "sameMessageSelector"); + assertEquals("Queue depth is wrong", 5, ((AMQSession<?, ?>) session).getQueueDepth(queue)); + + for (int i = 0; i < 5; i++) + { + Message message = subTwo.receive(1000); + if (message == null) + { + fail("sameMessageSelector test failed. no message was returned"); + } + else + { + assertEquals("sameMessageSelector test failed. message selector not reset", + "true", message.getStringProperty("testprop")); + } + } + + // Check queue has no messages + assertEquals("Queue should be empty", 0, ((AMQSession<?, ?>) session).getQueueDepth(queue)); + + // Unsubscribe + session.unsubscribe("sameMessageSelector"); + + conn.close(); + } + + /** + * <ul> + * <li>create and register a durable subscriber with a message selector + * <li>create another durable subscriber with a different selector and same name + * <li>check first subscriber is now closed + * <li>create a publisher and send messages + * <li>check messages are recieved correctly + * </ul> + * <p> + * QPID-2418 + */ + public void testResubscribeWithChangedSelectorNoClose() throws Exception { - return new junit.framework.TestSuite(DurableSubscriptionTest.class); - } + Connection conn = getConnection(); + conn.start(); + Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + AMQTopic topic = new AMQTopic((AMQConnection) conn, "testResubscribeWithChangedSelectorNoClose"); + + // Create durable subscriber that matches A + TopicSubscriber subA = session.createDurableSubscriber(topic, + "testResubscribeWithChangedSelectorNoClose", + "Match = True", false); + + // Reconnect with new selector that matches B + TopicSubscriber subB = session.createDurableSubscriber(topic, + "testResubscribeWithChangedSelectorNoClose", + "Match = false", false); + + // First subscription has been closed + try + { + subA.receive(1000); + fail("First subscription was not closed"); + } + catch (Exception e) + { + e.printStackTrace(); + } + + // Send 1 matching message and 1 non-matching message + MessageProducer producer = session.createProducer(topic); + TextMessage msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart1"); + msg.setBooleanProperty("Match", true); + producer.send(msg); + msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart2"); + msg.setBooleanProperty("Match", false); + producer.send(msg); + + // should be 1 message on queue now + AMQQueue queue = new AMQQueue("amq.topic", "clientid" + ":" + "testResubscribeWithChangedSelectorNoClose"); + assertEquals("Queue depth is wrong", 1, ((AMQSession<?, ?>) session).getQueueDepth(queue)); + + Message rMsg = subB.receive(1000); + assertNotNull(rMsg); + assertEquals("Content was wrong", + "testResubscribeWithChangedSelectorAndRestart2", + ((TextMessage) rMsg).getText()); + + rMsg = subB.receive(1000); + assertNull(rMsg); + + // Check queue has no messages + assertEquals("Queue should be empty", 0, ((AMQSession<?, ?>) session).getQueueDepth(queue)); + + conn.close(); + } + + /** + * <ul> + * <li>create and register a durable subscriber with no message selector + * <li>create another durable subscriber with a selector and same name + * <li>check first subscriber is now closed + * <li>create a publisher and send messages + * <li>check messages are recieved correctly + * </ul> + * <p> + * QPID-2418 + */ + public void testDurSubAddMessageSelectorNoClose() throws Exception + { + Connection conn = getConnection(); + conn.start(); + Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + AMQTopic topic = new AMQTopic((AMQConnection) conn, "subscriptionName"); + + // create and register a durable subscriber with no message selector + TopicSubscriber subOne = session.createDurableSubscriber(topic, "subscriptionName", null, false); + + // now create a durable subscriber with a selector + TopicSubscriber subTwo = session.createDurableSubscriber(topic, "subscriptionName", "testprop = TRUE", false); + + // First subscription has been closed + try + { + subOne.receive(1000); + fail("First subscription was not closed"); + } + catch (Exception e) + { + e.printStackTrace(); + } + + // Send 1 matching message and 1 non-matching message + MessageProducer producer = session.createProducer(topic); + TextMessage msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart1"); + msg.setBooleanProperty("testprop", true); + producer.send(msg); + msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart2"); + msg.setBooleanProperty("testprop", false); + producer.send(msg); + + // should be 1 message on queue now + AMQQueue queue = new AMQQueue("amq.topic", "clientid" + ":" + "subscriptionName"); + assertEquals("Queue depth is wrong", 1, ((AMQSession<?, ?>) session).getQueueDepth(queue)); + + Message rMsg = subTwo.receive(1000); + assertNotNull(rMsg); + assertEquals("Content was wrong", + "testResubscribeWithChangedSelectorAndRestart1", + ((TextMessage) rMsg).getText()); + + rMsg = subTwo.receive(1000); + assertNull(rMsg); + + // Check queue has no messages + assertEquals("Queue should be empty", 0, ((AMQSession<?, ?>) session).getQueueDepth(queue)); + + conn.close(); + } + + /** + * <ul> + * <li>create and register a durable subscriber with no message selector + * <li>try to create another durable with the same name, should fail + * </ul> + * <p> + * QPID-2418 + */ + public void testDurSubNoSelectorResubscribeNoClose() throws Exception + { + Connection conn = getConnection(); + conn.start(); + Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + AMQTopic topic = new AMQTopic((AMQConnection) conn, "subscriptionName"); + + // create and register a durable subscriber with no message selector + session.createDurableSubscriber(topic, "subscriptionName", null, false); + + // try to recreate the durable subscriber + try + { + session.createDurableSubscriber(topic, "subscriptionName", null, false); + fail("Subscription should not have been created"); + } + catch (Exception e) + { + e.printStackTrace(); + } + } } |