summaryrefslogtreecommitdiff
path: root/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java')
-rw-r--r--trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java444
1 files changed, 0 insertions, 444 deletions
diff --git a/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java b/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
deleted file mode 100644
index 2a44413ac8..0000000000
--- a/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
+++ /dev/null
@@ -1,444 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.test.unit.topic;
-
-import org.apache.qpid.test.utils.QpidTestCase;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.AMQTopic;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.jms.Connection;
-import javax.jms.InvalidDestinationException;
-import javax.jms.InvalidSelectorException;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-import javax.jms.TopicSubscriber;
-
-/**
- * @todo Code to check that a consumer gets only one particular method could be factored into a re-usable method (as
- * a static on a base test helper class, e.g. TestUtils.
- *
- * @todo Code to create test end-points using session per connection, or all sessions on one connection, to be factored
- * out to make creating this test variation simpler. Want to make this variation available through LocalCircuit,
- * driven by the test model.
- */
-public class DurableSubscriptionTest extends QpidTestCase
-{
- private static final Logger _logger = LoggerFactory.getLogger(DurableSubscriptionTest.class);
-
- /** Timeout for receive() if we are expecting a message */
- private static final long POSITIVE_RECEIVE_TIMEOUT = 2000;
-
- /** Timeout for receive() if we are not expecting a message */
- private static final long NEGATIVE_RECEIVE_TIMEOUT = 1000;
-
- public void testUnsubscribe() throws Exception
- {
- AMQConnection con = (AMQConnection) getConnection("guest", "guest");
- AMQTopic topic = new AMQTopic(con, "MyDurableSubscriptionTestTopic");
- _logger.info("Create Session 1");
- Session session1 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
- _logger.info("Create Consumer on Session 1");
- MessageConsumer consumer1 = session1.createConsumer(topic);
- _logger.info("Create Producer on Session 1");
- MessageProducer producer = session1.createProducer(topic);
-
- _logger.info("Create Session 2");
- Session session2 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
- _logger.info("Create Durable Subscriber on Session 2");
- TopicSubscriber consumer2 = session2.createDurableSubscriber(topic, "MySubscription");
-
- _logger.info("Starting connection");
- con.start();
-
- _logger.info("Producer sending message A");
- producer.send(session1.createTextMessage("A"));
-
- Message msg;
- _logger.info("Receive message on consumer 1:expecting A");
- msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT);
- assertNotNull("Message should have been received",msg);
- assertEquals("A", ((TextMessage) msg).getText());
- _logger.info("Receive message on consumer 1 :expecting null");
- msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT);
- assertEquals(null, msg);
-
- _logger.info("Receive message on consumer 2:expecting A");
- msg = consumer2.receive(POSITIVE_RECEIVE_TIMEOUT);
- assertNotNull("Message should have been received",msg);
- assertEquals("A", ((TextMessage) msg).getText());
- msg = consumer2.receive(NEGATIVE_RECEIVE_TIMEOUT);
- _logger.info("Receive message on consumer 1 :expecting null");
- assertEquals(null, msg);
-
- _logger.info("Unsubscribe session2/consumer2");
- session2.unsubscribe("MySubscription");
-
- _logger.info("Producer sending message B");
- producer.send(session1.createTextMessage("B"));
-
- _logger.info("Receive message on consumer 1 :expecting B");
- msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT);
- assertNotNull("Message should have been received",msg);
- assertEquals("B", ((TextMessage) msg).getText());
- _logger.info("Receive message on consumer 1 :expecting null");
- msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT);
- assertEquals(null, msg);
-
- _logger.info("Receive message on consumer 2 :expecting null");
- msg = consumer2.receive(NEGATIVE_RECEIVE_TIMEOUT);
- assertEquals(null, msg);
-
- _logger.info("Close connection");
- con.close();
- }
-
- public void testDurabilityAUTOACK() throws Exception
- {
- durabilityImpl(Session.AUTO_ACKNOWLEDGE);
- }
-
- public void testDurabilityNOACKSessionPerConnection() throws Exception
- {
- durabilityImplSessionPerConnection(AMQSession.NO_ACKNOWLEDGE);
- }
-
- public void testDurabilityAUTOACKSessionPerConnection() throws Exception
- {
- durabilityImplSessionPerConnection(Session.AUTO_ACKNOWLEDGE);
- }
-
- private void durabilityImpl(int ackMode) throws Exception
- {
- AMQConnection con = (AMQConnection) getConnection("guest", "guest");
- AMQTopic topic = new AMQTopic(con, "MyTopic");
- Session session1 = con.createSession(false, ackMode);
- MessageConsumer consumer1 = session1.createConsumer(topic);
-
- Session sessionProd = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
- MessageProducer producer = sessionProd.createProducer(topic);
-
- Session session2 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
- TopicSubscriber consumer2 = session2.createDurableSubscriber(topic, "MySubscription");
-
- con.start();
-
- producer.send(session1.createTextMessage("A"));
-
- Message msg;
- msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT);
- assertNotNull("Message should have been received",msg);
- assertEquals("A", ((TextMessage) msg).getText());
- msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT);
- assertEquals(null, msg);
-
- msg = consumer2.receive(POSITIVE_RECEIVE_TIMEOUT);
- assertNotNull("Message should have been received",msg);
- assertEquals("A", ((TextMessage) msg).getText());
- msg = consumer2.receive(NEGATIVE_RECEIVE_TIMEOUT);
- assertEquals(null, msg);
-
- consumer2.close();
-
- producer.send(session1.createTextMessage("B"));
-
- _logger.info("Receive message on consumer 1 :expecting B");
- msg = consumer1.receive(500);
- assertNotNull("Consumer 1 should get message 'B'.", msg);
- assertEquals("Incorrect Message recevied on consumer1.", "B", ((TextMessage) msg).getText());
- _logger.info("Receive message on consumer 1 :expecting null");
- msg = consumer1.receive(500);
- assertNull("There should be no more messages for consumption on consumer1.", msg);
-
- Session session3 = con.createSession(false, ackMode);
- MessageConsumer consumer3 = session3.createDurableSubscriber(topic, "MySubscription");
-
- _logger.info("Receive message on consumer 3 :expecting B");
- msg = consumer3.receive(500);
- assertNotNull("Consumer 3 should get message 'B'.", msg);
- assertEquals("Incorrect Message recevied on consumer4.", "B", ((TextMessage) msg).getText());
- _logger.info("Receive message on consumer 3 :expecting null");
- msg = consumer3.receive(500);
- assertNull("There should be no more messages for consumption on consumer3.", msg);
-
- consumer1.close();
- consumer3.close();
-
- session3.unsubscribe("MySubscription");
-
- con.close();
- }
-
- private void durabilityImplSessionPerConnection(int ackMode) throws Exception
- {
- Message msg;
- // Create producer.
- AMQConnection con0 = (AMQConnection) getConnection("guest", "guest");
- con0.start();
- Session session0 = con0.createSession(false, ackMode);
-
- AMQTopic topic = new AMQTopic(con0, "MyTopic");
-
- Session sessionProd = con0.createSession(false, ackMode);
- MessageProducer producer = sessionProd.createProducer(topic);
-
- // Create consumer 1.
- AMQConnection con1 = (AMQConnection) getConnection("guest", "guest");
- con1.start();
- Session session1 = con1.createSession(false, ackMode);
-
- MessageConsumer consumer1 = session0.createConsumer(topic);
-
- // Create consumer 2.
- AMQConnection con2 = (AMQConnection) getConnection("guest", "guest");
- con2.start();
- Session session2 = con2.createSession(false, ackMode);
-
- TopicSubscriber consumer2 = session2.createDurableSubscriber(topic, "MySubscription");
-
- // Send message and check that both consumers get it and only it.
- producer.send(session0.createTextMessage("A"));
-
- msg = consumer1.receive(500);
- assertNotNull("Message should be available", msg);
- assertEquals("Message Text doesn't match", "A", ((TextMessage) msg).getText());
- msg = consumer1.receive(500);
- assertNull("There should be no more messages for consumption on consumer1.", msg);
-
- msg = consumer2.receive(POSITIVE_RECEIVE_TIMEOUT);
- assertNotNull("Message should have been received",msg);
- assertEquals("Consumer 2 should also received the first msg.", "A", ((TextMessage) msg).getText());
- msg = consumer2.receive(500);
- assertNull("There should be no more messages for consumption on consumer2.", msg);
-
- // Detach the durable subscriber.
- consumer2.close();
- session2.close();
- con2.close();
-
- // Send message and receive on open consumer.
- producer.send(session0.createTextMessage("B"));
-
- _logger.info("Receive message on consumer 1 :expecting B");
- msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT);
- assertEquals("B", ((TextMessage) msg).getText());
- _logger.info("Receive message on consumer 1 :expecting null");
- msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT);
- assertEquals(null, msg);
-
- // Re-attach a new consumer to the durable subscription, and check that it gets the message that it missed.
- AMQConnection con3 = (AMQConnection) getConnection("guest", "guest");
- con3.start();
- Session session3 = con3.createSession(false, ackMode);
-
- TopicSubscriber consumer3 = session3.createDurableSubscriber(topic, "MySubscription");
-
- _logger.info("Receive message on consumer 3 :expecting B");
- msg = consumer3.receive(500);
- assertNotNull("Consumer 3 should get message 'B'.", msg);
- assertEquals("Incorrect Message recevied on consumer4.", "B", ((TextMessage) msg).getText());
- _logger.info("Receive message on consumer 3 :expecting null");
- msg = consumer3.receive(500);
- assertNull("There should be no more messages for consumption on consumer3.", msg);
-
- consumer1.close();
- consumer3.close();
-
- session3.unsubscribe("MySubscription");
-
- con0.close();
- con1.close();
- con3.close();
- }
-
- /***
- * This tests the fix for QPID-1085
- * Creates a durable subscriber with an invalid selector, checks that the
- * exception is thrown correctly and that the subscription is not created.
- * @throws Exception
- */
- public void testDurableWithInvalidSelector() throws Exception
- {
- Connection conn = getConnection();
- conn.start();
- Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- AMQTopic topic = new AMQTopic((AMQConnection) conn, "MyTestDurableWithInvalidSelectorTopic");
- MessageProducer producer = session.createProducer(topic);
- producer.send(session.createTextMessage("testDurableWithInvalidSelector1"));
- try
- {
- TopicSubscriber deadSubscriber = session.createDurableSubscriber(topic, "testDurableWithInvalidSelectorSub",
- "=TEST 'test", true);
- assertNull("Subscriber should not have been created", deadSubscriber);
- }
- catch (JMSException e)
- {
- assertTrue("Wrong type of exception thrown", e instanceof InvalidSelectorException);
- }
-
- TopicSubscriber liveSubscriber = session.createDurableSubscriber(topic, "testDurableWithInvalidSelectorSub");
- assertNotNull("Subscriber should have been created", liveSubscriber);
-
- producer.send(session.createTextMessage("testDurableWithInvalidSelector2"));
-
- Message msg = liveSubscriber.receive(POSITIVE_RECEIVE_TIMEOUT);
- assertNotNull ("Message should have been received", msg);
- assertEquals ("testDurableWithInvalidSelector2", ((TextMessage) msg).getText());
- assertNull("Should not receive subsequent message", liveSubscriber.receive(200));
- session.unsubscribe("testDurableWithInvalidSelectorSub");
- }
-
- /***
- * This tests the fix for QPID-1085
- * Creates a durable subscriber with an invalid destination, checks that the
- * exception is thrown correctly and that the subscription is not created.
- * @throws Exception
- */
- public void testDurableWithInvalidDestination() throws Exception
- {
- Connection conn = getConnection();
- conn.start();
- Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- AMQTopic topic = new AMQTopic((AMQConnection) conn, "testDurableWithInvalidDestinationTopic");
- try
- {
- TopicSubscriber deadSubscriber = session.createDurableSubscriber(null, "testDurableWithInvalidDestinationsub");
- assertNull("Subscriber should not have been created", deadSubscriber);
- }
- catch (InvalidDestinationException e)
- {
- // This was expected
- }
- MessageProducer producer = session.createProducer(topic);
- producer.send(session.createTextMessage("testDurableWithInvalidSelector1"));
-
- TopicSubscriber liveSubscriber = session.createDurableSubscriber(topic, "testDurableWithInvalidDestinationsub");
- assertNotNull("Subscriber should have been created", liveSubscriber);
-
- producer.send(session.createTextMessage("testDurableWithInvalidSelector2"));
- Message msg = liveSubscriber.receive(POSITIVE_RECEIVE_TIMEOUT);
- assertNotNull ("Message should have been received", msg);
- assertEquals ("testDurableWithInvalidSelector2", ((TextMessage) msg).getText());
- assertNull("Should not receive subsequent message", liveSubscriber.receive(200));
-
- session.unsubscribe("testDurableWithInvalidDestinationsub");
- }
-
- /**
- * Tests QPID-1202
- * Creates a durable subscription with a selector, then changes that selector on resubscription
- * @throws Exception
- */
- public void testResubscribeWithChangedSelector() throws Exception
- {
- Connection conn = getConnection();
- conn.start();
- Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- AMQTopic topic = new AMQTopic((AMQConnection) conn, "testResubscribeWithChangedSelector");
- MessageProducer producer = session.createProducer(topic);
-
- // Create durable subscriber that matches A
- TopicSubscriber subA = session.createDurableSubscriber(topic,
- "testResubscribeWithChangedSelector",
- "Match = True", false);
-
- // Send 1 matching message and 1 non-matching message
- sendMatchingAndNonMatchingMessage(session, producer);
-
- Message rMsg = subA.receive(NEGATIVE_RECEIVE_TIMEOUT);
- assertNotNull(rMsg);
- assertEquals("Content was wrong",
- "testResubscribeWithChangedSelector1",
- ((TextMessage) rMsg).getText());
-
- rMsg = subA.receive(NEGATIVE_RECEIVE_TIMEOUT);
- assertNull(rMsg);
-
- // Disconnect subscriber
- subA.close();
-
- // Reconnect with new selector that matches B
- TopicSubscriber subB = session.createDurableSubscriber(topic,
- "testResubscribeWithChangedSelector","Match = False", false);
-
-
- // Check messages are recieved properly
- sendMatchingAndNonMatchingMessage(session, producer);
- rMsg = subB.receive(NEGATIVE_RECEIVE_TIMEOUT);
- assertNotNull(rMsg);
- assertEquals("Content was wrong",
- "testResubscribeWithChangedSelector2",
- ((TextMessage) rMsg).getText());
-
- rMsg = subB.receive(NEGATIVE_RECEIVE_TIMEOUT);
- assertNull(rMsg);
- session.unsubscribe("testResubscribeWithChangedSelector");
- }
-
- public void testDurableSubscribeWithTemporaryTopic() throws Exception
- {
- Connection conn = getConnection();
- conn.start();
- Session ssn = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Topic topic = ssn.createTemporaryTopic();
- try
- {
- ssn.createDurableSubscriber(topic, "test");
- fail("expected InvalidDestinationException");
- }
- catch (InvalidDestinationException ex)
- {
- // this is expected
- }
- try
- {
- ssn.createDurableSubscriber(topic, "test", null, false);
- fail("expected InvalidDestinationException");
- }
- catch (InvalidDestinationException ex)
- {
- // this is expected
- }
- }
-
- private void sendMatchingAndNonMatchingMessage(Session session, MessageProducer producer) throws JMSException
- {
- TextMessage msg = session.createTextMessage("testResubscribeWithChangedSelector1");
- msg.setBooleanProperty("Match", true);
- producer.send(msg);
- msg = session.createTextMessage("testResubscribeWithChangedSelector2");
- msg.setBooleanProperty("Match", false);
- producer.send(msg);
- }
-
- public static junit.framework.Test suite()
- {
- return new junit.framework.TestSuite(DurableSubscriptionTest.class);
- }
-}