/* Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package org.apache.qpid.test.unit.ct; import javax.jms.Connection; import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicPublisher; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.AMQTopic; import org.apache.qpid.test.utils.QpidBrokerTestCase; /** * Crash Recovery tests for durable subscription * */ public class DurableSubscriberTest extends QpidBrokerTestCase { private final String _topicName = "durableSubscriberTopic"; /** * test strategy: * create and register a durable subscriber then close it * create a publisher and send a persistant message followed by a non persistant message * crash and restart the broker * recreate the durable subscriber and check that only the first message is received */ public void testDurSubRestoredAfterNonPersistentMessageSent() throws Exception { if (isBrokerStorePersistent()) { TopicConnectionFactory factory = getConnectionFactory(); Topic topic = (Topic) getInitialContext().lookup(_topicName); //create and register a durable subscriber then close it TopicConnection durConnection = factory.createTopicConnection("guest", "guest"); TopicSession durSession = durConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); TopicSubscriber durSub1 = durSession.createDurableSubscriber(topic, "dursub"); durConnection.start(); durSub1.close(); durSession.close(); durConnection.stop(); //create a publisher and send a persistant message followed by a non persistant message TopicConnection pubConnection = factory.createTopicConnection("guest", "guest"); TopicSession pubSession = pubConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); TopicPublisher publisher = pubSession.createPublisher(topic); Message message = pubSession.createMessage(); message.setIntProperty("count", 1); publisher.publish(message, javax.jms.DeliveryMode.PERSISTENT, javax.jms.Message.DEFAULT_PRIORITY, javax.jms.Message.DEFAULT_TIME_TO_LIVE); message.setIntProperty("count", 2); publisher.publish(message, javax.jms.DeliveryMode.NON_PERSISTENT, javax.jms.Message.DEFAULT_PRIORITY, javax.jms.Message.DEFAULT_TIME_TO_LIVE); publisher.close(); pubSession.close(); //now stop the server try { restartBroker(); } catch (Exception e) { _logger.error("problems restarting broker: " + e); throw e; } //now recreate the durable subscriber and check the received messages factory = getConnectionFactory(); topic = (Topic) getInitialContext().lookup(_topicName); TopicConnection durConnection2 = factory.createTopicConnection("guest", "guest"); TopicSession durSession2 = durConnection2.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); TopicSubscriber durSub2 = durSession2.createDurableSubscriber(topic, "dursub"); durConnection2.start(); Message m1 = durSub2.receive(1000); if (m1 == null) { assertTrue("testDurSubRestoredAfterNonPersistentMessageSent test failed. no message was returned", false); } assertTrue("testDurSubRestoredAfterNonPersistentMessageSent test failed. Wrong message was returned.", m1.getIntProperty("count") == 1); durSession2.unsubscribe("dursub"); durConnection2.close(); } } /** * create and register a durable subscriber with a message selector and then close it * crash the broker * create a publisher and send 5 right messages and 5 wrong messages * recreate the durable subscriber and check we receive the 5 expected messages */ public void testDurSubRestoresMessageSelector() throws Exception { if (isBrokerStorePersistent()) { TopicConnectionFactory factory = getConnectionFactory(); Topic topic = (Topic) getInitialContext().lookup(_topicName); //create and register a durable subscriber with a message selector and then close it TopicConnection durConnection = factory.createTopicConnection("guest", "guest"); TopicSession durSession = durConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); TopicSubscriber durSub1 = durSession.createDurableSubscriber(topic, "dursub", "testprop='true'", false); durConnection.start(); durSub1.close(); durSession.close(); durConnection.stop(); //now stop the server try { restartBroker(); } catch (Exception e) { _logger.error("problems restarting broker: " + e); throw e; } topic = (Topic) getInitialContext().lookup(_topicName); factory = getConnectionFactory(); TopicConnection pubConnection = factory.createTopicConnection("guest", "guest"); TopicSession pubSession = pubConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); TopicPublisher publisher = pubSession.createPublisher(topic); for (int i = 0; i < 5; i++) { Message message = pubSession.createMessage(); message.setStringProperty("testprop", "true"); publisher.publish(message); message = pubSession.createMessage(); message.setStringProperty("testprop", "false"); publisher.publish(message); } publisher.close(); pubSession.close(); //now recreate the durable subscriber and check the received messages TopicConnection durConnection2 = factory.createTopicConnection("guest", "guest"); TopicSession durSession2 = durConnection2.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); TopicSubscriber durSub2 = durSession2.createDurableSubscriber(topic, "dursub", "testprop='true'", false); durConnection2.start(); for (int i = 0; i < 5; i++) { Message message = durSub2.receive(1000); if (message == null) { assertTrue("testDurSubRestoresMessageSelector test failed. no message was returned", false); } else { assertTrue("testDurSubRestoresMessageSelector test failed. message selector not reset", message.getStringProperty("testprop").equals("true")); } } durSession2.unsubscribe("dursub"); durConnection2.close(); } } /** * create and register a durable subscriber without a message selector and then unsubscribe it * create and register a durable subscriber with a message selector and then close it * restart the broker * send matching and non matching messages * recreate and register the durable subscriber with a message selector * verify only the matching messages are received */ public void testDurSubChangedToHaveSelectorThenRestart() throws Exception { if (! isBrokerStorePersistent()) { _logger.warn("Test skipped due to requirement of a persistent store"); return; } final String SUB_NAME=getTestQueueName(); TopicConnectionFactory factory = getConnectionFactory(); Topic topic = (Topic) getInitialContext().lookup(_topicName); //create and register a durable subscriber then unsubscribe it TopicConnection durConnection = factory.createTopicConnection("guest", "guest"); TopicSession durSession = durConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); TopicSubscriber durSub1 = durSession.createDurableSubscriber(topic, SUB_NAME); durConnection.start(); durSub1.close(); durSession.unsubscribe(SUB_NAME); durSession.close(); durConnection.close(); //create and register a durable subscriber with a message selector and then close it TopicConnection durConnection2 = factory.createTopicConnection("guest", "guest"); TopicSession durSession2 = durConnection2.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); TopicSubscriber durSub2 = durSession2.createDurableSubscriber(topic, SUB_NAME, "testprop='true'", false); durConnection2.start(); durSub2.close(); durSession2.close(); durConnection2.close(); //now restart the server try { restartBroker(); } catch (Exception e) { _logger.error("problems restarting broker: " + e); throw e; } //send messages matching and not matching the selector TopicConnection pubConnection = factory.createTopicConnection("guest", "guest"); TopicSession pubSession = pubConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); TopicPublisher publisher = pubSession.createPublisher(topic); for (int i = 0; i < 5; i++) { Message message = pubSession.createMessage(); message.setStringProperty("testprop", "true"); publisher.publish(message); message = pubSession.createMessage(); message.setStringProperty("testprop", "false"); publisher.publish(message); } publisher.close(); pubSession.close(); //now recreate the durable subscriber with selector to check there are no exceptions generated //and then verify the messages are received correctly TopicConnection durConnection3 = (TopicConnection) factory.createConnection("guest", "guest"); TopicSession durSession3 = (TopicSession) durConnection3.createSession(false, Session.AUTO_ACKNOWLEDGE); TopicSubscriber durSub3 = durSession3.createDurableSubscriber(topic, SUB_NAME, "testprop='true'", false); durConnection3.start(); for (int i = 0; i < 5; i++) { Message message = durSub3.receive(2000); if (message == null) { fail("testDurSubChangedToHaveSelectorThenRestart test failed. Expected message " + i + " was not returned"); } else { assertTrue("testDurSubChangedToHaveSelectorThenRestart test failed. Got message not matching selector", message.getStringProperty("testprop").equals("true")); } } durSub3.close(); durSession3.unsubscribe(SUB_NAME); durSession3.close(); durConnection3.close(); } /** * create and register a durable subscriber with a message selector and then unsubscribe it * create and register a durable subscriber without a message selector and then close it * restart the broker * send matching and non matching messages * recreate and register the durable subscriber without a message selector * verify ALL the sent messages are received */ public void testDurSubChangedToNotHaveSelectorThenRestart() throws Exception { if (! isBrokerStorePersistent()) { _logger.warn("Test skipped due to requirement of a persistent store"); return; } final String SUB_NAME=getTestQueueName(); TopicConnectionFactory factory = getConnectionFactory(); Topic topic = (Topic) getInitialContext().lookup(_topicName); //create and register a durable subscriber with selector then unsubscribe it TopicConnection durConnection = factory.createTopicConnection("guest", "guest"); TopicSession durSession = durConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); TopicSubscriber durSub1 = durSession.createDurableSubscriber(topic, SUB_NAME, "testprop='true'", false); durConnection.start(); durSub1.close(); durSession.unsubscribe(SUB_NAME); durSession.close(); durConnection.close(); //create and register a durable subscriber without the message selector and then close it TopicConnection durConnection2 = factory.createTopicConnection("guest", "guest"); TopicSession durSession2 = durConnection2.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); TopicSubscriber durSub2 = durSession2.createDurableSubscriber(topic, SUB_NAME); durConnection2.start(); durSub2.close(); durSession2.close(); durConnection2.close(); //now restart the server try { restartBroker(); } catch (Exception e) { _logger.error("problems restarting broker: " + e); throw e; } //send messages matching and not matching the original used selector TopicConnection pubConnection = factory.createTopicConnection("guest", "guest"); TopicSession pubSession = pubConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); TopicPublisher publisher = pubSession.createPublisher(topic); for (int i = 1; i <= 5; i++) { Message message = pubSession.createMessage(); message.setStringProperty("testprop", "true"); publisher.publish(message); message = pubSession.createMessage(); message.setStringProperty("testprop", "false"); publisher.publish(message); } publisher.close(); pubSession.close(); //now recreate the durable subscriber without selector to check there are no exceptions generated //then verify ALL messages sent are received TopicConnection durConnection3 = (TopicConnection) factory.createConnection("guest", "guest"); TopicSession durSession3 = (TopicSession) durConnection3.createSession(false, Session.AUTO_ACKNOWLEDGE); TopicSubscriber durSub3 = durSession3.createDurableSubscriber(topic, SUB_NAME); durConnection3.start(); for (int i = 1; i <= 10; i++) { Message message = durSub3.receive(2000); if (message == null) { fail("testDurSubChangedToNotHaveSelectorThenRestart test failed. Expected message " + i + " was not received"); } } durSub3.close(); durSession3.unsubscribe(SUB_NAME); durSession3.close(); durConnection3.close(); } public void testResubscribeWithChangedSelectorAndRestart() throws Exception { if (! isBrokerStorePersistent()) { _logger.warn("Test skipped due to requirement of a persistent store"); return; } Connection conn = getConnection(); conn.start(); Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); AMQTopic topic = new AMQTopic((AMQConnection) conn, "testResubscribeWithChangedSelectorAndRestart"); MessageProducer producer = session.createProducer(topic); // Create durable subscriber that matches A TopicSubscriber subA = session.createDurableSubscriber(topic, "testResubscribeWithChangedSelector", "Match = True", false); // Send 1 matching message and 1 non-matching message TextMessage msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart1"); msg.setBooleanProperty("Match", true); producer.send(msg); msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart2"); msg.setBooleanProperty("Match", false); producer.send(msg); Message rMsg = subA.receive(1000); assertNotNull(rMsg); assertEquals("Content was wrong", "testResubscribeWithChangedSelectorAndRestart1", ((TextMessage) rMsg).getText()); // Queue has no messages left AMQQueue subQueueTmp = new AMQQueue("amq.topic", "clientid" + ":" + "testResubscribeWithChangedSelectorAndRestart"); assertEquals("Msg count should be 0", 0, ((AMQSession) session).getQueueDepth(subQueueTmp)); rMsg = subA.receive(1000); assertNull(rMsg); // Send another 1 matching message and 1 non-matching message msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart1"); msg.setBooleanProperty("Match", true); producer.send(msg); msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart2"); msg.setBooleanProperty("Match", false); producer.send(msg); // Disconnect subscriber without receiving the message to //leave it on the underlying queue subA.close(); // Reconnect with new selector that matches B TopicSubscriber subB = session.createDurableSubscriber(topic, "testResubscribeWithChangedSelectorAndRestart", "Match = false", false); //verify no messages are now present on the queue as changing selector should have issued //an unsubscribe and thus deleted the previous durable backing queue for the subscription. //check the dur sub's underlying queue now has msg count 0 AMQQueue subQueue = new AMQQueue("amq.topic", "clientid" + ":" + "testResubscribeWithChangedSelectorAndRestart"); assertEquals("Msg count should be 0", 0, ((AMQSession) session).getQueueDepth(subQueue)); // Check that new messages are received properly msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart1"); msg.setBooleanProperty("Match", true); producer.send(msg); msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart2"); msg.setBooleanProperty("Match", false); producer.send(msg); rMsg = subB.receive(1000); assertNotNull(rMsg); assertEquals("Content was wrong", "testResubscribeWithChangedSelectorAndRestart2", ((TextMessage) rMsg).getText()); rMsg = subB.receive(1000); assertNull(rMsg); //check the dur sub's underlying queue now has msg count 0 subQueue = new AMQQueue("amq.topic", "clientid" + ":" + "testResubscribeWithChangedSelectorAndRestart"); assertEquals("Msg count should be 0", 0, ((AMQSession) session).getQueueDepth(subQueue)); //now restart the server try { restartBroker(); } catch (Exception e) { _logger.error("problems restarting broker: " + e); throw e; } // Reconnect to broker Connection connection = getConnectionFactory().createConnection("guest", "guest"); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); topic = new AMQTopic((AMQConnection) connection, "testResubscribeWithChangedSelectorAndRestart"); producer = session.createProducer(topic); //verify no messages now present on the queue after we restart the broker //check the dur sub's underlying queue now has msg count 0 subQueue = new AMQQueue("amq.topic", "clientid" + ":" + "testResubscribeWithChangedSelectorAndRestart"); assertEquals("Msg count should be 0", 0, ((AMQSession) session).getQueueDepth(subQueue)); // Reconnect with new selector that matches B TopicSubscriber subC = session.createDurableSubscriber(topic, "testResubscribeWithChangedSelectorAndRestart", "Match = False", false); // Check that new messages are still sent and recieved properly msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart1"); msg.setBooleanProperty("Match", true); producer.send(msg); msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart2"); msg.setBooleanProperty("Match", false); producer.send(msg); //check the dur sub's underlying queue now has msg count 1 subQueue = new AMQQueue("amq.topic", "clientid" + ":" + "testResubscribeWithChangedSelectorAndRestart"); assertEquals("Msg count should be 1", 1, ((AMQSession) session).getQueueDepth(subQueue)); rMsg = subC.receive(1000); assertNotNull(rMsg); assertEquals("Content was wrong", "testResubscribeWithChangedSelectorAndRestart2", ((TextMessage) rMsg).getText()); rMsg = subC.receive(1000); assertNull(rMsg); session.unsubscribe("testResubscribeWithChangedSelectorAndRestart"); subC.close(); session.close(); connection.close(); } }