diff options
Diffstat (limited to 'qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java')
-rw-r--r-- | qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java | 232 |
1 files changed, 232 insertions, 0 deletions
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java new file mode 100644 index 0000000000..cc19bcf5d8 --- /dev/null +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java @@ -0,0 +1,232 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.Topic; +import javax.jms.TopicConnection; +import javax.jms.TopicPublisher; +import javax.jms.TopicSession; +import javax.jms.TopicSubscriber; + +import junit.framework.TestCase; + +import org.apache.qpid.client.AMQConnectionFactory; +import org.apache.qpid.url.URLSyntaxException; + +/** + * Prepares an older version brokers BDB store with the required + * contents for use in the BDBStoreUpgradeTest. + * + * The store will then be used to verify that the upgraded is + * completed properly and that once upgraded it functions as + * expected with the new broker. + */ +public class BDBStoreUpgradeTestPreparer extends TestCase +{ + public static final String TOPIC_NAME="myUpgradeTopic"; + public static final String SUB_NAME="myDurSubName"; + public static final String QUEUE_NAME="myUpgradeQueue"; + + private static AMQConnectionFactory _connFac; + private static final String CONN_URL = + "amqp://guest:guest@clientid/test?brokerlist='tcp://localhost:5672'"; + + /** + * Create a BDBStoreUpgradeTestPreparer instance + */ + public BDBStoreUpgradeTestPreparer () throws URLSyntaxException + { + _connFac = new AMQConnectionFactory(CONN_URL); + } + + /** + * Utility test method to allow running the preparation tool + * using the test framework + */ + public void testPrepareBroker() throws Exception + { + prepareBroker(); + } + + private void prepareBroker() throws Exception + { + prepareQueues(); + prepareDurableSubscription(); + } + + /** + * Prepare a queue for use in testing message and binding recovery + * after the upgrade is performed. + * + * - Create a transacted session on the connection. + * - Use a consumer to create the (durable by default) queue. + * - Send 5 large messages to test (multi-frame) content recovery. + * - Send 1 small message to test (single-frame) content recovery. + * - Commit the session. + * - Send 5 small messages to test that uncommitted messages are not recovered. + * following the upgrade. + * - Close the session. + */ + private void prepareQueues() throws Exception + { + // Create a connection + Connection connection = _connFac.createConnection(); + connection.start(); + connection.setExceptionListener(new ExceptionListener() + { + public void onException(JMSException e) + { + e.printStackTrace(); + } + }); + // Create a session on the connection, transacted to confirm delivery + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Queue queue = session.createQueue(QUEUE_NAME); + // Create a consumer to ensure the queue gets created + // (and enter it into the store, as queues are made durable by default) + MessageConsumer messageConsumer = session.createConsumer(queue); + messageConsumer.close(); + + // Create a Message producer + MessageProducer messageProducer = session.createProducer(queue); + + // Publish 5 persistent messages, 256k chars to ensure they are multi-frame + sendMessages(session, messageProducer, queue, DeliveryMode.PERSISTENT, 256*1024, 5); + // Publish 5 persistent messages, 1k chars to ensure they are single-frame + sendMessages(session, messageProducer, queue, DeliveryMode.PERSISTENT, 1*1024, 5); + + session.commit(); + + // Publish 5 persistent messages which will NOT be committed and so should be 'lost' + sendMessages(session, messageProducer, queue, DeliveryMode.PERSISTENT, 1*1024, 5); + + session.close(); + connection.close(); + } + + /** + * Prepare a DurableSubscription backing queue for use in testing selector + * recovery and queue exclusivity marking during the upgrade process. + * + * - Create a transacted session on the connection. + * - Open and close a DurableSubscription with selector to create the backing queue. + * - Send a message which matches the selector. + * - Send a message which does not match the selector. + * - Send a message which matches the selector but will remain uncommitted. + * - Close the session. + */ + private void prepareDurableSubscription() throws Exception + { + + // Create a connection + TopicConnection connection = _connFac.createTopicConnection(); + connection.start(); + connection.setExceptionListener(new ExceptionListener() + { + public void onException(JMSException e) + { + e.printStackTrace(); + } + }); + // Create a session on the connection, transacted to confirm delivery + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Topic topic = session.createTopic(TOPIC_NAME); + + // Create and register a durable subscriber with selector and then close it + TopicSubscriber durSub1 = session.createDurableSubscriber(topic, SUB_NAME,"testprop='true'", false); + durSub1.close(); + + // Create a publisher and send a persistent message which matches the selector + // followed by one that does not match, and another which matches but is not + // committed and so should be 'lost' + TopicSession pubSession = connection.createTopicSession(true, Session.SESSION_TRANSACTED); + TopicPublisher publisher = pubSession.createPublisher(topic); + + publishMessages(session, publisher, topic, DeliveryMode.PERSISTENT, 1*1024, 1, "true"); + publishMessages(session, publisher, topic, DeliveryMode.PERSISTENT, 1*1024, 1, "false"); + pubSession.commit(); + publishMessages(session, publisher, topic, DeliveryMode.PERSISTENT, 1*1024, 1, "true"); + + publisher.close(); + pubSession.close(); + + } + + public static void sendMessages(Session session, MessageProducer messageProducer, + Destination dest, int deliveryMode, int length, int numMesages) throws JMSException + { + for (int i = 1; i <= numMesages; i++) + { + Message message = session.createTextMessage(generateString(length)); + message.setIntProperty("ID", i); + messageProducer.send(message, deliveryMode, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); + } + } + + public static void publishMessages(Session session, TopicPublisher publisher, + Destination dest, int deliveryMode, int length, int numMesages, String selectorProperty) throws JMSException + { + for (int i = 1; i <= numMesages; i++) + { + Message message = session.createTextMessage(generateString(length)); + message.setIntProperty("ID", i); + message.setStringProperty("testprop", selectorProperty); + publisher.publish(message, deliveryMode, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); + } + } + + /** + * Generates a string of a given length consisting of the sequence 0,1,2,..,9,0,1,2. + * + * @param length number of characters in the string + * @return string sequence of the given length + */ + public static String generateString(int length) + { + char[] base_chars = new char[]{'0','1','2','3','4','5','6','7','8','9'}; + char[] chars = new char[length]; + for (int i = 0; i < (length); i++) + { + chars[i] = base_chars[i % 10]; + } + return new String(chars); + } + + /** + * Run the preparation tool. + * @param args Command line arguments. + */ + public static void main(String[] args) throws Exception + { + BDBStoreUpgradeTestPreparer producer = new BDBStoreUpgradeTestPreparer(); + producer.prepareBroker(); + } +}
\ No newline at end of file |