diff options
Diffstat (limited to 'java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java')
-rw-r--r-- | java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java | 127 |
1 files changed, 119 insertions, 8 deletions
diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java index 4e201d5473..e4837b212e 100644 --- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java +++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java @@ -22,16 +22,20 @@ package org.apache.qpid.server.store.berkeleydb; import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.NON_DURABLE_QUEUE_NAME; +import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.PRIORITY_QUEUE_NAME; import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.QUEUE_NAME; +import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.QUEUE_WITH_DLQ_NAME; import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.SELECTOR_SUB_NAME; import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.SELECTOR_TOPIC_NAME; import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.SUB_NAME; import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.TOPIC_NAME; import java.io.File; +import java.io.InputStream; import javax.jms.Connection; import javax.jms.DeliveryMode; +import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; @@ -43,7 +47,10 @@ import javax.jms.TopicConnection; import javax.jms.TopicPublisher; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.TabularDataSupport; +import org.apache.qpid.management.common.mbeans.ManagedExchange; import org.apache.qpid.management.common.mbeans.ManagedQueue; import org.apache.qpid.test.utils.JMXTestUtils; import org.apache.qpid.test.utils.QpidBrokerTestCase; @@ -70,7 +77,7 @@ public class BDBUpgradeTest extends QpidBrokerTestCase public void setUp() throws Exception { assertNotNull("QPID_WORK must be set", QPID_WORK_ORIG); - _storeLocation = getWorkDirBaseDir() + "/bdbstore/test-store"; + _storeLocation = getWorkDirBaseDir() + File.separator + "test-store"; //Clear the two target directories if they exist. File directory = new File(_storeLocation); @@ -78,15 +85,13 @@ public class BDBUpgradeTest extends QpidBrokerTestCase { FileUtils.delete(directory, true); } + directory.mkdirs(); // copy store files - String src = getClass().getClassLoader().getResource("upgrade/bdbstore-v4/test-store").toURI().getPath(); - FileUtils.copyRecursive(new File(src), new File(_storeLocation)); - - //override the broker config used and then start the broker with the updated store - _configFile = new File("build/etc/config-systests-bdb.xml"); - setConfigurationProperty("management.enabled", "true"); + InputStream src = getClass().getClassLoader().getResourceAsStream("upgrade/bdbstore-v4/test-store/00000000.jdb"); + FileUtils.copy(src, new File(_storeLocation, "00000000.jdb")); + getBrokerConfiguration().addJmxManagementConfiguration(); super.setUp(); } @@ -302,11 +307,110 @@ public class BDBUpgradeTest extends QpidBrokerTestCase Queue queue = session.createQueue(NON_DURABLE_QUEUE_NAME); MessageConsumer messageConsumer = session.createConsumer(queue); - for (int i = 0; i < 3; i++) + for (int i = 1; i <= 3; i++) { Message message = messageConsumer.receive(1000); assertNotNull("Message was not migrated!", message); assertTrue("Unexpected message received!", message instanceof TextMessage); + assertEquals("ID property did not match", i, message.getIntProperty("ID")); + } + } + + /** + * Tests store upgrade has maintained the priority queue configuration, + * such that sending messages with priorities out-of-order and then consuming + * them gets the messages back in priority order. + */ + public void testPriorityQueue() throws Exception + { + // Create a connection and start it + Connection connection = getConnection(); + connection.start(); + + // send some messages to the priority queue + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(PRIORITY_QUEUE_NAME); + MessageProducer producer = session.createProducer(queue); + + producer.setPriority(4); + producer.send(createMessage(1, false, session, producer)); + producer.setPriority(1); + producer.send(createMessage(2, false, session, producer)); + producer.setPriority(9); + producer.send(createMessage(3, false, session, producer)); + session.close(); + + //consume the messages, expected order: msg 3, msg 1, msg 2. + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(queue); + + Message msg = consumer.receive(1500); + assertNotNull("expected message was not received", msg); + assertEquals(3, msg.getIntProperty("msg")); + msg = consumer.receive(1500); + assertNotNull("expected message was not received", msg); + assertEquals(1, msg.getIntProperty("msg")); + msg = consumer.receive(1500); + assertNotNull("expected message was not received", msg); + assertEquals(2, msg.getIntProperty("msg")); + } + + /** + * Test that the queue configured to have a DLQ was recovered and has the alternate exchange + * and max delivery count, the DLE exists, the DLQ exists with no max delivery count, the + * DLQ is bound to the DLE, and that the DLQ does not itself have a DLQ. + * + * DLQs are NOT enabled at the virtualhost level, we are testing recovery of the arguments + * that turned it on for this specific queue. + */ + public void testRecoveryOfQueueWithDLQ() throws Exception + { + JMXTestUtils jmxUtils = null; + try + { + jmxUtils = new JMXTestUtils(this, "guest", "guest"); + jmxUtils.open(); + } + catch (Exception e) + { + fail("Unable to establish JMX connection, test cannot proceed"); + } + + try + { + //verify the DLE exchange exists, has the expected type, and a single binding for the DLQ + ManagedExchange exchange = jmxUtils.getManagedExchange(QUEUE_WITH_DLQ_NAME + "_DLE"); + assertEquals("Wrong exchange type", "fanout", exchange.getExchangeType()); + TabularDataSupport bindings = (TabularDataSupport) exchange.bindings(); + assertEquals(1, bindings.size()); + for(Object o : bindings.values()) + { + CompositeData binding = (CompositeData) o; + + String bindingKey = (String) binding.get(ManagedExchange.BINDING_KEY); + String[] queueNames = (String[]) binding.get(ManagedExchange.QUEUE_NAMES); + + //Because its a fanout exchange, we just return a single '*' key with all bound queues + assertEquals("unexpected binding key", "*", bindingKey); + assertEquals("unexpected number of queues bound", 1, queueNames.length); + assertEquals("unexpected queue name", QUEUE_WITH_DLQ_NAME + "_DLQ", queueNames[0]); + } + + //verify the queue exists, has the expected alternate exchange and max delivery count + ManagedQueue queue = jmxUtils.getManagedQueue(QUEUE_WITH_DLQ_NAME); + assertEquals("Queue does not have the expected AlternateExchange", QUEUE_WITH_DLQ_NAME + "_DLE", queue.getAlternateExchange()); + assertEquals("Unexpected maximum delivery count", Integer.valueOf(2), queue.getMaximumDeliveryCount()); + + ManagedQueue dlQqueue = jmxUtils.getManagedQueue(QUEUE_WITH_DLQ_NAME + "_DLQ"); + assertNull("Queue should not have an AlternateExchange", dlQqueue.getAlternateExchange()); + assertEquals("Unexpected maximum delivery count", Integer.valueOf(0), dlQqueue.getMaximumDeliveryCount()); + + String dlqDlqObjectNameString = jmxUtils.getQueueObjectNameString("test", QUEUE_WITH_DLQ_NAME + "_DLQ" + "_DLQ"); + assertFalse("a DLQ should not exist for the DLQ itself", jmxUtils.doesManagedObjectExist(dlqDlqObjectNameString)); + } + finally + { + jmxUtils.close(); } } @@ -387,4 +491,11 @@ public class BDBUpgradeTest extends QpidBrokerTestCase session.close(); } + private Message createMessage(int msgId, boolean first, Session producerSession, MessageProducer producer) throws JMSException + { + Message send = producerSession.createTextMessage("Message: " + msgId); + send.setIntProperty("msg", msgId); + + return send; + } } |