summaryrefslogtreecommitdiff
path: root/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java')
-rw-r--r--java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java62
1 files changed, 60 insertions, 2 deletions
diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java
index 122f846a2d..390d667db0 100644
--- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java
+++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java
@@ -20,6 +20,9 @@
*/
package org.apache.qpid.server.store.berkeleydb;
+import java.util.HashMap;
+import java.util.Map;
+
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
@@ -35,6 +38,11 @@ import javax.jms.TopicConnection;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
+import javax.management.MBeanServerConnection;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXServiceURL;
import org.apache.qpid.client.AMQConnectionFactory;
import org.apache.qpid.client.AMQDestination;
@@ -62,6 +70,11 @@ public class BDBStoreUpgradeTestPreparer
public static final String QUEUE_NAME="myUpgradeQueue";
public static final String NON_DURABLE_QUEUE_NAME="queue-non-durable";
+ public static final String PRIORITY_QUEUE_NAME="myPriorityQueue";
+ public static final String QUEUE_WITH_DLQ_NAME="myQueueWithDLQ";
+ public static final String NONEXCLUSIVE_WITH_ERRONEOUS_OWNER = "nonexclusive-with-erroneous-owner";
+ public static final String MISUSED_OWNER = "misused-owner-as-description";
+
private static AMQConnectionFactory _connFac;
private static final String CONN_URL =
"amqp://guest:guest@clientid/test?brokerlist='tcp://localhost:5672'";
@@ -86,10 +99,10 @@ public class BDBStoreUpgradeTestPreparer
{
Connection connection = _connFac.createConnection();
AMQSession<?, ?> session = (AMQSession<?,?>)connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- AMQShortString queueName = AMQShortString.valueOf(NON_DURABLE_QUEUE_NAME);
+ AMQShortString queueName = new AMQShortString(NON_DURABLE_QUEUE_NAME);
AMQDestination destination = (AMQDestination) session.createQueue(NON_DURABLE_QUEUE_NAME);
session.sendCreateQueue(queueName, false, false, false, null);
- session.bindQueue(queueName, queueName, null, AMQShortString.valueOf("amq.direct"), destination);
+ session.bindQueue(queueName, queueName, null, new AMQShortString("amq.direct"), destination);
MessageProducer messageProducer = session.createProducer(destination);
sendMessages(session, messageProducer, destination, DeliveryMode.PERSISTENT, 1024, 3);
connection.close();
@@ -140,11 +153,56 @@ public class BDBStoreUpgradeTestPreparer
// Publish 5 persistent messages which will NOT be committed and so should be 'lost'
sendMessages(session, messageProducer, queue, DeliveryMode.PERSISTENT, 1*1024, 5);
+ messageProducer.close();
+ session.close();
+
+ session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ // Create a priority queue on broker
+ final Map<String,Object> priorityQueueArguments = new HashMap<String, Object>();
+ priorityQueueArguments.put("x-qpid-priorities",10);
+ createAndBindQueueOnBroker(session, PRIORITY_QUEUE_NAME, priorityQueueArguments);
+
+ // Create a queue that has a DLQ
+ final Map<String,Object> queueWithDLQArguments = new HashMap<String, Object>();
+ queueWithDLQArguments.put("x-qpid-dlq-enabled", true);
+ queueWithDLQArguments.put("x-qpid-maximum-delivery-count", 2);
+ createAndBindQueueOnBroker(session, QUEUE_WITH_DLQ_NAME, queueWithDLQArguments);
+
+ // Send message to the DLQ
+ Queue dlq = session.createQueue("fanout://" + QUEUE_WITH_DLQ_NAME + "_DLE//does-not-matter");
+ MessageProducer dlqMessageProducer = session.createProducer(dlq);
+ sendMessages(session, dlqMessageProducer, dlq, DeliveryMode.PERSISTENT, 1*1024, 1);
+ session.commit();
+ // Create a queue with JMX specifying an owner, so it can later be moved into description
+ createAndBindQueueOnBrokerWithJMX(NONEXCLUSIVE_WITH_ERRONEOUS_OWNER, MISUSED_OWNER, priorityQueueArguments);
session.close();
connection.close();
}
+ private void createAndBindQueueOnBroker(Session session, String queueName, final Map<String, Object> arguments) throws Exception
+ {
+ ((AMQSession<?,?>) session).createQueue(new AMQShortString(queueName), false, true, false, arguments);
+ Queue queue = (Queue) session.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='true'");
+ ((AMQSession<?,?>) session).declareAndBind((AMQDestination)queue);
+ }
+
+ private void createAndBindQueueOnBrokerWithJMX(String queueName, String owner, final Map<String, Object> arguments) throws Exception
+ {
+ Map<String, Object> environment = new HashMap<String, Object>();
+ environment.put(JMXConnector.CREDENTIALS, new String[] {"admin","admin"});
+ JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:8999/jmxrmi");
+ JMXConnector jmxConnector = JMXConnectorFactory.connect(url, environment);
+ MBeanServerConnection mbsc = jmxConnector.getMBeanServerConnection();
+ ObjectName virtualHost = new ObjectName("org.apache.qpid:type=VirtualHost.VirtualHostManager,VirtualHost=\"test\"");
+
+ Object[] params = new Object[] {queueName, owner, true, arguments};
+ String[] signature = new String[] {String.class.getName(), String.class.getName(), boolean.class.getName(), Map.class.getName()};
+ mbsc.invoke(virtualHost, "createNewQueue", params, signature);
+
+ ObjectName directExchange = new ObjectName("org.apache.qpid:type=VirtualHost.Exchange,VirtualHost=\"test\",name=\"amq.direct\",ExchangeType=direct");
+ mbsc.invoke(directExchange, "createNewBinding", new Object[] {queueName, queueName}, new String[] {String.class.getName(), String.class.getName()});
+ }
/**
* Prepare a DurableSubscription backing queue for use in testing selector
* recovery and queue exclusivity marking during the upgrade process.