summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2012-12-04 15:23:30 +0000
committerRobert Gemmell <robbie@apache.org>2012-12-04 15:23:30 +0000
commitcdf3a3ed338af87bd218683d54643f9ee2d0c5a7 (patch)
tree3d32e3402997f5ea0dd551ccacb7bb076dca5525
parent0f1e3b7bdae8794cee77065111456637b1e4019c (diff)
downloadqpid-python-cdf3a3ed338af87bd218683d54643f9ee2d0c5a7.tar.gz
QPID-4441: add system test verification of the priority queue and queue with DLQ following the store upgrade
merged from trunk r1415127 git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.20@1416987 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java111
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java10
2 files changed, 118 insertions, 3 deletions
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
index 4a9efad685..3c0f72c302 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
@@ -22,7 +22,9 @@ package org.apache.qpid.server.store.berkeleydb;
import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.NON_DURABLE_QUEUE_NAME;
+import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.PRIORITY_QUEUE_NAME;
import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.QUEUE_NAME;
+import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.QUEUE_WITH_DLQ_NAME;
import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.SELECTOR_SUB_NAME;
import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.SELECTOR_TOPIC_NAME;
import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.SUB_NAME;
@@ -32,6 +34,7 @@ import java.io.File;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
@@ -43,7 +46,10 @@ import javax.jms.TopicConnection;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.TabularDataSupport;
+import org.apache.qpid.management.common.mbeans.ManagedExchange;
import org.apache.qpid.management.common.mbeans.ManagedQueue;
import org.apache.qpid.test.utils.JMXTestUtils;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
@@ -311,6 +317,104 @@ public class BDBUpgradeTest extends QpidBrokerTestCase
}
}
+ /**
+ * Tests store upgrade has maintained the priority queue configuration,
+ * such that sending messages with priorities out-of-order and then consuming
+ * them gets the messages back in priority order.
+ */
+ public void testPriorityQueue() throws Exception
+ {
+ // Create a connection and start it
+ Connection connection = getConnection();
+ connection.start();
+
+ // send some messages to the priority queue
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(PRIORITY_QUEUE_NAME);
+ MessageProducer producer = session.createProducer(queue);
+
+ producer.setPriority(4);
+ producer.send(createMessage(1, false, session, producer));
+ producer.setPriority(1);
+ producer.send(createMessage(2, false, session, producer));
+ producer.setPriority(9);
+ producer.send(createMessage(3, false, session, producer));
+ session.close();
+
+ //consume the messages, expected order: msg 3, msg 1, msg 2.
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ Message msg = consumer.receive(1500);
+ assertNotNull("expected message was not received", msg);
+ assertEquals(3, msg.getIntProperty("msg"));
+ msg = consumer.receive(1500);
+ assertNotNull("expected message was not received", msg);
+ assertEquals(1, msg.getIntProperty("msg"));
+ msg = consumer.receive(1500);
+ assertNotNull("expected message was not received", msg);
+ assertEquals(2, msg.getIntProperty("msg"));
+ }
+
+ /**
+ * Test that the queue configured to have a DLQ was recovered and has the alternate exchange
+ * and max delivery count, the DLE exists, the DLQ exists with no max delivery count, the
+ * DLQ is bound to the DLE, and that the DLQ does not itself have a DLQ.
+ *
+ * DLQs are NOT enabled at the virtualhost level, we are testing recovery of the arguments
+ * that turned it on for this specific queue.
+ */
+ public void testRecoveryOfQueueWithDLQ() throws Exception
+ {
+ JMXTestUtils jmxUtils = null;
+ try
+ {
+ jmxUtils = new JMXTestUtils(this, "guest", "guest");
+ jmxUtils.open();
+ }
+ catch (Exception e)
+ {
+ fail("Unable to establish JMX connection, test cannot proceed");
+ }
+
+ try
+ {
+ //verify the DLE exchange exists, has the expected type, and a single binding for the DLQ
+ ManagedExchange exchange = jmxUtils.getManagedExchange(QUEUE_WITH_DLQ_NAME + "_DLE");
+ assertEquals("Wrong exchange type", "fanout", exchange.getExchangeType());
+ TabularDataSupport bindings = (TabularDataSupport) exchange.bindings();
+ assertEquals(1, bindings.size());
+ for(Object o : bindings.values())
+ {
+ CompositeData binding = (CompositeData) o;
+
+ String bindingKey = (String) binding.get(ManagedExchange.BINDING_KEY);
+ String[] queueNames = (String[]) binding.get(ManagedExchange.QUEUE_NAMES);
+
+ //Because its a fanout exchange, we just return a single '*' key with all bound queues
+ assertEquals("unexpected binding key", "*", bindingKey);
+ assertEquals("unexpected number of queues bound", 1, queueNames.length);
+ assertEquals("unexpected queue name", QUEUE_WITH_DLQ_NAME + "_DLQ", queueNames[0]);
+ }
+
+ //verify the queue exists, has the expected alternate exchange and max delivery count
+ ManagedQueue queue = jmxUtils.getManagedQueue(QUEUE_WITH_DLQ_NAME);
+ assertEquals("Queue does not have the expected AlternateExchange", QUEUE_WITH_DLQ_NAME + "_DLE", queue.getAlternateExchange());
+ assertEquals("Unexpected maximum delivery count", Integer.valueOf(2), queue.getMaximumDeliveryCount());
+
+ ManagedQueue dlQqueue = jmxUtils.getManagedQueue(QUEUE_WITH_DLQ_NAME + "_DLQ");
+ assertNull("Queue should not have an AlternateExchange", dlQqueue.getAlternateExchange());
+ assertEquals("Unexpected maximum delivery count", Integer.valueOf(0), dlQqueue.getMaximumDeliveryCount());
+
+ String dlqDlqObjectNameString = jmxUtils.getQueueObjectNameString("test", QUEUE_WITH_DLQ_NAME + "_DLQ" + "_DLQ");
+ assertFalse("a DLQ should not exist for the DLQ itself", jmxUtils.doesManagedObjectExist(dlqDlqObjectNameString));
+ }
+ finally
+ {
+ jmxUtils.close();
+ }
+ }
+
private void consumeDurableSubscriptionMessages(Connection connection, boolean selector) throws Exception
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -388,4 +492,11 @@ public class BDBUpgradeTest extends QpidBrokerTestCase
session.close();
}
+ private Message createMessage(int msgId, boolean first, Session producerSession, MessageProducer producer) throws JMSException
+ {
+ Message send = producerSession.createTextMessage("Message: " + msgId);
+ send.setIntProperty("msg", msgId);
+
+ return send;
+ }
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java
index 1891231f8c..826d00d0df 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java
@@ -286,9 +286,7 @@ public class JMXTestUtils
public ObjectName getQueueObjectName(String virtualHostName, String queue)
{
// Get the name of the test manager
- String query = "org.apache.qpid:type=VirtualHost.Queue,VirtualHost="
- + ObjectName.quote(virtualHostName) + ",name="
- + ObjectName.quote(queue) + ",*";
+ String query = getQueueObjectNameString(virtualHostName, queue);
Set<ObjectName> objectNames = queryObjects(query);
@@ -301,6 +299,12 @@ public class JMXTestUtils
return objectName;
}
+ public String getQueueObjectNameString(String virtualHostName, String queue) {
+ return "org.apache.qpid:type=VirtualHost.Queue,VirtualHost="
+ + ObjectName.quote(virtualHostName) + ",name="
+ + ObjectName.quote(queue) + ",*";
+ }
+
/**
* Generate the ObjectName for the given Exchange on a VirtualHost.
*/