diff options
Diffstat (limited to 'qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java')
-rw-r--r-- | qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java | 548 |
1 files changed, 548 insertions, 0 deletions
diff --git a/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java new file mode 100644 index 0000000000..491856d953 --- /dev/null +++ b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java @@ -0,0 +1,548 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import java.io.File; +import java.io.InputStream; +import java.util.Map; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; +import javax.jms.TopicConnection; +import javax.jms.TopicPublisher; +import javax.jms.TopicSession; +import javax.jms.TopicSubscriber; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.TabularDataSupport; + +import org.apache.qpid.management.common.mbeans.ManagedExchange; +import org.apache.qpid.management.common.mbeans.ManagedQueue; +import org.apache.qpid.server.model.VirtualHostNode; +import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBVirtualHostNode; +import org.apache.qpid.test.utils.JMXTestUtils; +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.test.utils.TestBrokerConfiguration; +import org.apache.qpid.util.FileUtils; +import org.apache.qpid.util.Strings; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Tests upgrading a BDB store on broker startup. + * The store will then be used to verify that the upgrade is completed + * properly and that once upgraded it functions as expected. + * + * Store prepared using old client/broker with BDBStoreUpgradeTestPreparer. + */ +public class BDBUpgradeTest extends QpidBrokerTestCase +{ + protected static final Logger _logger = LoggerFactory.getLogger(BDBUpgradeTest.class); + + private static final String QPID_WORK_ORIG = System.getProperty("QPID_WORK"); + + private static final String STRING_1024 = generateString(1024); + private static final String STRING_1024_256 = generateString(1024*256); + + private static final String TOPIC_NAME="myUpgradeTopic"; + private static final String SUB_NAME="myDurSubName"; + private static final String SELECTOR_SUB_NAME="mySelectorDurSubName"; + private static final String SELECTOR_TOPIC_NAME="mySelectorUpgradeTopic"; + private static final String QUEUE_NAME="myUpgradeQueue"; + private static final String NON_DURABLE_QUEUE_NAME="queue-non-durable"; + private static final String PRIORITY_QUEUE_NAME="myPriorityQueue"; + private static final String QUEUE_WITH_DLQ_NAME="myQueueWithDLQ"; + + private String _storeLocation; + + @Override + public void setUp() throws Exception + { + assertNotNull("QPID_WORK must be set", QPID_WORK_ORIG); + Map<String, Object> virtualHostNodeAttributes = getBrokerConfiguration().getObjectAttributes(VirtualHostNode.class, TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST); + _storeLocation = Strings.expand((String)virtualHostNodeAttributes.get(BDBVirtualHostNode.STORE_PATH)); + + //Clear the two target directories if they exist. + File directory = new File(_storeLocation); + if (directory.exists() && directory.isDirectory()) + { + FileUtils.delete(directory, true); + } + directory.mkdirs(); + + // copy store files + InputStream src = getClass().getClassLoader().getResourceAsStream("upgrade/bdbstore-v4/test-store/00000000.jdb"); + FileUtils.copy(src, new File(_storeLocation, "00000000.jdb")); + + getBrokerConfiguration().addJmxManagementConfiguration(); + super.setUp(); + } + + /** + * Test that the selector applied to the DurableSubscription was successfully + * transfered to the new store, and functions as expected with continued use + * by monitoring message count while sending new messages to the topic and then + * consuming them. + */ + public void testSelectorDurability() throws Exception + { + JMXTestUtils jmxUtils = null; + try + { + jmxUtils = new JMXTestUtils(this, "guest", "guest"); + jmxUtils.open(); + } + catch (Exception e) + { + fail("Unable to establish JMX connection, test cannot proceed"); + } + + try + { + ManagedQueue dursubQueue = jmxUtils.getManagedQueue("clientid" + ":" + SELECTOR_SUB_NAME); + assertEquals("DurableSubscription backing queue should have 1 message on it initially", + new Integer(1), dursubQueue.getMessageCount()); + + // Create a connection and start it + TopicConnection connection = (TopicConnection) getConnection(); + connection.start(); + + // Send messages which don't match and do match the selector, checking message count + TopicSession pubSession = connection.createTopicSession(true, Session.SESSION_TRANSACTED); + Topic topic = pubSession.createTopic(SELECTOR_TOPIC_NAME); + TopicPublisher publisher = pubSession.createPublisher(topic); + + publishMessages(pubSession, publisher, topic, DeliveryMode.PERSISTENT, 1*1024, 1, "false"); + pubSession.commit(); + assertEquals("DurableSubscription backing queue should still have 1 message on it", + Integer.valueOf(1), dursubQueue.getMessageCount()); + + publishMessages(pubSession, publisher, topic, DeliveryMode.PERSISTENT, 1*1024, 1, "true"); + pubSession.commit(); + assertEquals("DurableSubscription backing queue should now have 2 messages on it", + Integer.valueOf(2), dursubQueue.getMessageCount()); + + TopicSubscriber durSub = pubSession.createDurableSubscriber(topic, SELECTOR_SUB_NAME,"testprop='true'", false); + Message m = durSub.receive(2000); + assertNotNull("Failed to receive an expected message", m); + m = durSub.receive(2000); + assertNotNull("Failed to receive an expected message", m); + pubSession.commit(); + + pubSession.close(); + } + finally + { + jmxUtils.close(); + } + } + + /** + * Test that the DurableSubscription without selector was successfully + * transfered to the new store, and functions as expected with continued use. + */ + public void testDurableSubscriptionWithoutSelector() throws Exception + { + JMXTestUtils jmxUtils = null; + try + { + jmxUtils = new JMXTestUtils(this, "guest", "guest"); + jmxUtils.open(); + } + catch (Exception e) + { + fail("Unable to establish JMX connection, test cannot proceed"); + } + + try + { + ManagedQueue dursubQueue = jmxUtils.getManagedQueue("clientid" + ":" + SUB_NAME); + assertEquals("DurableSubscription backing queue should have 1 message on it initially", + new Integer(1), dursubQueue.getMessageCount()); + + // Create a connection and start it + TopicConnection connection = (TopicConnection) getConnection(); + connection.start(); + + // Send new message matching the topic, checking message count + TopicSession session = connection.createTopicSession(true, Session.SESSION_TRANSACTED); + Topic topic = session.createTopic(TOPIC_NAME); + TopicPublisher publisher = session.createPublisher(topic); + + publishMessages(session, publisher, topic, DeliveryMode.PERSISTENT, 1*1024, 1, "indifferent"); + session.commit(); + assertEquals("DurableSubscription backing queue should now have 2 messages on it", + Integer.valueOf(2), dursubQueue.getMessageCount()); + + TopicSubscriber durSub = session.createDurableSubscriber(topic, SUB_NAME); + Message m = durSub.receive(2000); + assertNotNull("Failed to receive an expected message", m); + m = durSub.receive(2000); + assertNotNull("Failed to receive an expected message", m); + + session.commit(); + session.close(); + } + finally + { + jmxUtils.close(); + } + } + + /** + * Test that the backing queue for the durable subscription created was successfully + * detected and set as being exclusive during the upgrade process, and that the + * regular queue was not. + */ + public void testQueueExclusivity() throws Exception + { + JMXTestUtils jmxUtils = null; + try + { + jmxUtils = new JMXTestUtils(this, "guest", "guest"); + jmxUtils.open(); + } + catch (Exception e) + { + fail("Unable to establish JMX connection, test cannot proceed"); + } + + try + { + ManagedQueue queue = jmxUtils.getManagedQueue(QUEUE_NAME); + assertFalse("Queue should not have been marked as Exclusive during upgrade", queue.isExclusive()); + + ManagedQueue dursubQueue = jmxUtils.getManagedQueue("clientid" + ":" + SUB_NAME); + assertTrue("DurableSubscription backing queue should have been marked as Exclusive during upgrade", dursubQueue.isExclusive()); + } + finally + { + jmxUtils.close(); + } + } + + /** + * Test that the upgraded queue continues to function properly when used + * for persistent messaging and restarting the broker. + * + * Sends the new messages to the queue BEFORE consuming those which were + * sent before the upgrade. In doing so, this also serves to test that + * the queue bindings were successfully transitioned during the upgrade. + */ + public void testBindingAndMessageDurabability() throws Exception + { + // Create a connection and start it + TopicConnection connection = (TopicConnection) getConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(QUEUE_NAME); + MessageProducer messageProducer = session.createProducer(queue); + + // Send a new message + sendMessages(session, messageProducer, queue, DeliveryMode.PERSISTENT, 256*1024, 1); + + session.close(); + + // Restart the broker + restartBroker(); + + // Drain the queue of all messages + connection = (TopicConnection) getConnection(); + connection.start(); + consumeQueueMessages(connection, true); + } + + /** + * Test that all of the committed persistent messages previously sent to + * the broker are properly received following update of the MetaData and + * Content entries during the store upgrade process. + */ + public void testConsumptionOfUpgradedMessages() throws Exception + { + // Create a connection and start it + Connection connection = getConnection(); + connection.start(); + + consumeDurableSubscriptionMessages(connection, true); + consumeDurableSubscriptionMessages(connection, false); + consumeQueueMessages(connection, false); + } + + /** + * Tests store migration containing messages for non-existing queue. + * + * @throws Exception + */ + public void testMigrationOfMessagesForNonDurableQueues() throws Exception + { + // Create a connection and start it + Connection connection = getConnection(); + connection.start(); + + // consume a message for non-existing store + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(NON_DURABLE_QUEUE_NAME); + MessageConsumer messageConsumer = session.createConsumer(queue); + + for (int i = 1; i <= 3; i++) + { + Message message = messageConsumer.receive(1000); + assertNotNull("Message was not migrated!", message); + assertTrue("Unexpected message received!", message instanceof TextMessage); + assertEquals("ID property did not match", i, message.getIntProperty("ID")); + } + } + + /** + * Tests store upgrade has maintained the priority queue configuration, + * such that sending messages with priorities out-of-order and then consuming + * them gets the messages back in priority order. + */ + public void testPriorityQueue() throws Exception + { + // Create a connection and start it + Connection connection = getConnection(); + connection.start(); + + // send some messages to the priority queue + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(PRIORITY_QUEUE_NAME); + MessageProducer producer = session.createProducer(queue); + + producer.setPriority(4); + producer.send(createMessage(1, false, session, producer)); + producer.setPriority(1); + producer.send(createMessage(2, false, session, producer)); + producer.setPriority(9); + producer.send(createMessage(3, false, session, producer)); + session.close(); + + //consume the messages, expected order: msg 3, msg 1, msg 2. + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(queue); + + Message msg = consumer.receive(1500); + assertNotNull("expected message was not received", msg); + assertEquals(3, msg.getIntProperty("msg")); + msg = consumer.receive(1500); + assertNotNull("expected message was not received", msg); + assertEquals(1, msg.getIntProperty("msg")); + msg = consumer.receive(1500); + assertNotNull("expected message was not received", msg); + assertEquals(2, msg.getIntProperty("msg")); + } + + /** + * + * TODO (QPID-5650) Resolve so this test can be reenabled. + * + * Test that the queue configured to have a DLQ was recovered and has the alternate exchange + * and max delivery count, the DLE exists, the DLQ exists with no max delivery count, the + * DLQ is bound to the DLE, and that the DLQ does not itself have a DLQ. + * + * DLQs are NOT enabled at the virtualhost level, we are testing recovery of the arguments + * that turned it on for this specific queue. + */ + public void xtestRecoveryOfQueueWithDLQ() throws Exception + { + JMXTestUtils jmxUtils = null; + try + { + jmxUtils = new JMXTestUtils(this, "guest", "guest"); + jmxUtils.open(); + } + catch (Exception e) + { + fail("Unable to establish JMX connection, test cannot proceed"); + } + + try + { + //verify the DLE exchange exists, has the expected type, and a single binding for the DLQ + ManagedExchange exchange = jmxUtils.getManagedExchange(QUEUE_WITH_DLQ_NAME + "_DLE"); + assertEquals("Wrong exchange type", "fanout", exchange.getExchangeType()); + TabularDataSupport bindings = (TabularDataSupport) exchange.bindings(); + assertEquals(1, bindings.size()); + for(Object o : bindings.values()) + { + CompositeData binding = (CompositeData) o; + + String bindingKey = (String) binding.get(ManagedExchange.BINDING_KEY); + String[] queueNames = (String[]) binding.get(ManagedExchange.QUEUE_NAMES); + + //Because its a fanout exchange, we just return a single '*' key with all bound queues + assertEquals("unexpected binding key", "*", bindingKey); + assertEquals("unexpected number of queues bound", 1, queueNames.length); + assertEquals("unexpected queue name", QUEUE_WITH_DLQ_NAME + "_DLQ", queueNames[0]); + } + + //verify the queue exists, has the expected alternate exchange and max delivery count + ManagedQueue queue = jmxUtils.getManagedQueue(QUEUE_WITH_DLQ_NAME); + assertEquals("Queue does not have the expected AlternateExchange", QUEUE_WITH_DLQ_NAME + "_DLE", queue.getAlternateExchange()); + assertEquals("Unexpected maximum delivery count", Integer.valueOf(2), queue.getMaximumDeliveryCount()); + + ManagedQueue dlQqueue = jmxUtils.getManagedQueue(QUEUE_WITH_DLQ_NAME + "_DLQ"); + assertNull("Queue should not have an AlternateExchange", dlQqueue.getAlternateExchange()); + assertEquals("Unexpected maximum delivery count", Integer.valueOf(0), dlQqueue.getMaximumDeliveryCount()); + + String dlqDlqObjectNameString = jmxUtils.getQueueObjectNameString("test", QUEUE_WITH_DLQ_NAME + "_DLQ" + "_DLQ"); + assertFalse("a DLQ should not exist for the DLQ itself", jmxUtils.doesManagedObjectExist(dlqDlqObjectNameString)); + } + finally + { + jmxUtils.close(); + } + } + + private void consumeDurableSubscriptionMessages(Connection connection, boolean selector) throws Exception + { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = null; + TopicSubscriber durSub = null; + + if(selector) + { + topic = session.createTopic(SELECTOR_TOPIC_NAME); + durSub = session.createDurableSubscriber(topic, SELECTOR_SUB_NAME,"testprop='true'", false); + } + else + { + topic = session.createTopic(TOPIC_NAME); + durSub = session.createDurableSubscriber(topic, SUB_NAME); + } + + + // Retrieve the matching message + Message m = durSub.receive(2000); + assertNotNull("Failed to receive an expected message", m); + if(selector) + { + assertEquals("Selector property did not match", "true", m.getStringProperty("testprop")); + } + assertEquals("ID property did not match", 1, m.getIntProperty("ID")); + assertEquals("Message content was not as expected", generateString(1024) , ((TextMessage)m).getText()); + + // Verify that no more messages are received + m = durSub.receive(1000); + assertNull("No more messages should have been recieved", m); + + durSub.close(); + session.close(); + } + + private void consumeQueueMessages(Connection connection, boolean extraMessage) throws Exception + { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(QUEUE_NAME); + + MessageConsumer consumer = session.createConsumer(queue); + Message m; + + // Retrieve the initial pre-upgrade messages + for (int i=1; i <= 5 ; i++) + { + m = consumer.receive(2000); + assertNotNull("Failed to receive an expected message", m); + assertEquals("ID property did not match", i, m.getIntProperty("ID")); + assertEquals("Message content was not as expected", STRING_1024_256, ((TextMessage)m).getText()); + } + for (int i=1; i <= 5 ; i++) + { + m = consumer.receive(2000); + assertNotNull("Failed to receive an expected message", m); + assertEquals("ID property did not match", i, m.getIntProperty("ID")); + assertEquals("Message content was not as expected", STRING_1024, ((TextMessage)m).getText()); + } + + if(extraMessage) + { + //verify that the extra message is received + m = consumer.receive(2000); + assertNotNull("Failed to receive an expected message", m); + assertEquals("ID property did not match", 1, m.getIntProperty("ID")); + assertEquals("Message content was not as expected", STRING_1024_256, ((TextMessage)m).getText()); + } + + // Verify that no more messages are received + m = consumer.receive(1000); + assertNull("No more messages should have been recieved", m); + + consumer.close(); + session.close(); + } + + private Message createMessage(int msgId, boolean first, Session producerSession, MessageProducer producer) throws JMSException + { + Message send = producerSession.createTextMessage("Message: " + msgId); + send.setIntProperty("msg", msgId); + + return send; + } + + /** + * Generates a string of a given length consisting of the sequence 0,1,2,..,9,0,1,2. + * + * @param length number of characters in the string + * @return string sequence of the given length + */ + private static String generateString(int length) + { + char[] base_chars = new char[]{'0','1','2','3','4','5','6','7','8','9'}; + char[] chars = new char[length]; + for (int i = 0; i < (length); i++) + { + chars[i] = base_chars[i % 10]; + } + return new String(chars); + } + + private static void sendMessages(Session session, MessageProducer messageProducer, + Destination dest, int deliveryMode, int length, int numMesages) throws JMSException + { + for (int i = 1; i <= numMesages; i++) + { + Message message = session.createTextMessage(generateString(length)); + message.setIntProperty("ID", i); + messageProducer.send(message, deliveryMode, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); + } + } + + private static void publishMessages(Session session, TopicPublisher publisher, + Destination dest, int deliveryMode, int length, int numMesages, String selectorProperty) throws JMSException + { + for (int i = 1; i <= numMesages; i++) + { + Message message = session.createTextMessage(generateString(length)); + message.setIntProperty("ID", i); + message.setStringProperty("testprop", selectorProperty); + publisher.publish(message, deliveryMode, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); + } + } +} |