summaryrefslogtreecommitdiff
path: root/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java')
-rw-r--r--java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java127
1 files changed, 119 insertions, 8 deletions
diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
index 4e201d5473..e4837b212e 100644
--- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
+++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
@@ -22,16 +22,20 @@ package org.apache.qpid.server.store.berkeleydb;
import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.NON_DURABLE_QUEUE_NAME;
+import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.PRIORITY_QUEUE_NAME;
import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.QUEUE_NAME;
+import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.QUEUE_WITH_DLQ_NAME;
import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.SELECTOR_SUB_NAME;
import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.SELECTOR_TOPIC_NAME;
import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.SUB_NAME;
import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.TOPIC_NAME;
import java.io.File;
+import java.io.InputStream;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
@@ -43,7 +47,10 @@ import javax.jms.TopicConnection;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.TabularDataSupport;
+import org.apache.qpid.management.common.mbeans.ManagedExchange;
import org.apache.qpid.management.common.mbeans.ManagedQueue;
import org.apache.qpid.test.utils.JMXTestUtils;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
@@ -70,7 +77,7 @@ public class BDBUpgradeTest extends QpidBrokerTestCase
public void setUp() throws Exception
{
assertNotNull("QPID_WORK must be set", QPID_WORK_ORIG);
- _storeLocation = getWorkDirBaseDir() + "/bdbstore/test-store";
+ _storeLocation = getWorkDirBaseDir() + File.separator + "test-store";
//Clear the two target directories if they exist.
File directory = new File(_storeLocation);
@@ -78,15 +85,13 @@ public class BDBUpgradeTest extends QpidBrokerTestCase
{
FileUtils.delete(directory, true);
}
+ directory.mkdirs();
// copy store files
- String src = getClass().getClassLoader().getResource("upgrade/bdbstore-v4/test-store").toURI().getPath();
- FileUtils.copyRecursive(new File(src), new File(_storeLocation));
-
- //override the broker config used and then start the broker with the updated store
- _configFile = new File("build/etc/config-systests-bdb.xml");
- setConfigurationProperty("management.enabled", "true");
+ InputStream src = getClass().getClassLoader().getResourceAsStream("upgrade/bdbstore-v4/test-store/00000000.jdb");
+ FileUtils.copy(src, new File(_storeLocation, "00000000.jdb"));
+ getBrokerConfiguration().addJmxManagementConfiguration();
super.setUp();
}
@@ -302,11 +307,110 @@ public class BDBUpgradeTest extends QpidBrokerTestCase
Queue queue = session.createQueue(NON_DURABLE_QUEUE_NAME);
MessageConsumer messageConsumer = session.createConsumer(queue);
- for (int i = 0; i < 3; i++)
+ for (int i = 1; i <= 3; i++)
{
Message message = messageConsumer.receive(1000);
assertNotNull("Message was not migrated!", message);
assertTrue("Unexpected message received!", message instanceof TextMessage);
+ assertEquals("ID property did not match", i, message.getIntProperty("ID"));
+ }
+ }
+
+ /**
+ * Tests store upgrade has maintained the priority queue configuration,
+ * such that sending messages with priorities out-of-order and then consuming
+ * them gets the messages back in priority order.
+ */
+ public void testPriorityQueue() throws Exception
+ {
+ // Create a connection and start it
+ Connection connection = getConnection();
+ connection.start();
+
+ // send some messages to the priority queue
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(PRIORITY_QUEUE_NAME);
+ MessageProducer producer = session.createProducer(queue);
+
+ producer.setPriority(4);
+ producer.send(createMessage(1, false, session, producer));
+ producer.setPriority(1);
+ producer.send(createMessage(2, false, session, producer));
+ producer.setPriority(9);
+ producer.send(createMessage(3, false, session, producer));
+ session.close();
+
+ //consume the messages, expected order: msg 3, msg 1, msg 2.
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ Message msg = consumer.receive(1500);
+ assertNotNull("expected message was not received", msg);
+ assertEquals(3, msg.getIntProperty("msg"));
+ msg = consumer.receive(1500);
+ assertNotNull("expected message was not received", msg);
+ assertEquals(1, msg.getIntProperty("msg"));
+ msg = consumer.receive(1500);
+ assertNotNull("expected message was not received", msg);
+ assertEquals(2, msg.getIntProperty("msg"));
+ }
+
+ /**
+ * Test that the queue configured to have a DLQ was recovered and has the alternate exchange
+ * and max delivery count, the DLE exists, the DLQ exists with no max delivery count, the
+ * DLQ is bound to the DLE, and that the DLQ does not itself have a DLQ.
+ *
+ * DLQs are NOT enabled at the virtualhost level, we are testing recovery of the arguments
+ * that turned it on for this specific queue.
+ */
+ public void testRecoveryOfQueueWithDLQ() throws Exception
+ {
+ JMXTestUtils jmxUtils = null;
+ try
+ {
+ jmxUtils = new JMXTestUtils(this, "guest", "guest");
+ jmxUtils.open();
+ }
+ catch (Exception e)
+ {
+ fail("Unable to establish JMX connection, test cannot proceed");
+ }
+
+ try
+ {
+ //verify the DLE exchange exists, has the expected type, and a single binding for the DLQ
+ ManagedExchange exchange = jmxUtils.getManagedExchange(QUEUE_WITH_DLQ_NAME + "_DLE");
+ assertEquals("Wrong exchange type", "fanout", exchange.getExchangeType());
+ TabularDataSupport bindings = (TabularDataSupport) exchange.bindings();
+ assertEquals(1, bindings.size());
+ for(Object o : bindings.values())
+ {
+ CompositeData binding = (CompositeData) o;
+
+ String bindingKey = (String) binding.get(ManagedExchange.BINDING_KEY);
+ String[] queueNames = (String[]) binding.get(ManagedExchange.QUEUE_NAMES);
+
+ //Because its a fanout exchange, we just return a single '*' key with all bound queues
+ assertEquals("unexpected binding key", "*", bindingKey);
+ assertEquals("unexpected number of queues bound", 1, queueNames.length);
+ assertEquals("unexpected queue name", QUEUE_WITH_DLQ_NAME + "_DLQ", queueNames[0]);
+ }
+
+ //verify the queue exists, has the expected alternate exchange and max delivery count
+ ManagedQueue queue = jmxUtils.getManagedQueue(QUEUE_WITH_DLQ_NAME);
+ assertEquals("Queue does not have the expected AlternateExchange", QUEUE_WITH_DLQ_NAME + "_DLE", queue.getAlternateExchange());
+ assertEquals("Unexpected maximum delivery count", Integer.valueOf(2), queue.getMaximumDeliveryCount());
+
+ ManagedQueue dlQqueue = jmxUtils.getManagedQueue(QUEUE_WITH_DLQ_NAME + "_DLQ");
+ assertNull("Queue should not have an AlternateExchange", dlQqueue.getAlternateExchange());
+ assertEquals("Unexpected maximum delivery count", Integer.valueOf(0), dlQqueue.getMaximumDeliveryCount());
+
+ String dlqDlqObjectNameString = jmxUtils.getQueueObjectNameString("test", QUEUE_WITH_DLQ_NAME + "_DLQ" + "_DLQ");
+ assertFalse("a DLQ should not exist for the DLQ itself", jmxUtils.doesManagedObjectExist(dlqDlqObjectNameString));
+ }
+ finally
+ {
+ jmxUtils.close();
}
}
@@ -387,4 +491,11 @@ public class BDBUpgradeTest extends QpidBrokerTestCase
session.close();
}
+ private Message createMessage(int msgId, boolean first, Session producerSession, MessageProducer producer) throws JMSException
+ {
+ Message send = producerSession.createTextMessage("Message: " + msgId);
+ send.setIntProperty("msg", msgId);
+
+ return send;
+ }
}