diff options
Diffstat (limited to 'java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java')
-rw-r--r-- | java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java | 62 |
1 files changed, 60 insertions, 2 deletions
diff --git a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java index 122f846a2d..390d667db0 100644 --- a/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java +++ b/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java @@ -20,6 +20,9 @@ */ package org.apache.qpid.server.store.berkeleydb; +import java.util.HashMap; +import java.util.Map; + import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Destination; @@ -35,6 +38,11 @@ import javax.jms.TopicConnection; import javax.jms.TopicPublisher; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; +import javax.management.MBeanServerConnection; +import javax.management.ObjectName; +import javax.management.remote.JMXConnector; +import javax.management.remote.JMXConnectorFactory; +import javax.management.remote.JMXServiceURL; import org.apache.qpid.client.AMQConnectionFactory; import org.apache.qpid.client.AMQDestination; @@ -62,6 +70,11 @@ public class BDBStoreUpgradeTestPreparer public static final String QUEUE_NAME="myUpgradeQueue"; public static final String NON_DURABLE_QUEUE_NAME="queue-non-durable"; + public static final String PRIORITY_QUEUE_NAME="myPriorityQueue"; + public static final String QUEUE_WITH_DLQ_NAME="myQueueWithDLQ"; + public static final String NONEXCLUSIVE_WITH_ERRONEOUS_OWNER = "nonexclusive-with-erroneous-owner"; + public static final String MISUSED_OWNER = "misused-owner-as-description"; + private static AMQConnectionFactory _connFac; private static final String CONN_URL = "amqp://guest:guest@clientid/test?brokerlist='tcp://localhost:5672'"; @@ -86,10 +99,10 @@ public class BDBStoreUpgradeTestPreparer { Connection connection = _connFac.createConnection(); AMQSession<?, ?> session = (AMQSession<?,?>)connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - AMQShortString queueName = AMQShortString.valueOf(NON_DURABLE_QUEUE_NAME); + AMQShortString queueName = new AMQShortString(NON_DURABLE_QUEUE_NAME); AMQDestination destination = (AMQDestination) session.createQueue(NON_DURABLE_QUEUE_NAME); session.sendCreateQueue(queueName, false, false, false, null); - session.bindQueue(queueName, queueName, null, AMQShortString.valueOf("amq.direct"), destination); + session.bindQueue(queueName, queueName, null, new AMQShortString("amq.direct"), destination); MessageProducer messageProducer = session.createProducer(destination); sendMessages(session, messageProducer, destination, DeliveryMode.PERSISTENT, 1024, 3); connection.close(); @@ -140,11 +153,56 @@ public class BDBStoreUpgradeTestPreparer // Publish 5 persistent messages which will NOT be committed and so should be 'lost' sendMessages(session, messageProducer, queue, DeliveryMode.PERSISTENT, 1*1024, 5); + messageProducer.close(); + session.close(); + + session = connection.createSession(true, Session.SESSION_TRANSACTED); + // Create a priority queue on broker + final Map<String,Object> priorityQueueArguments = new HashMap<String, Object>(); + priorityQueueArguments.put("x-qpid-priorities",10); + createAndBindQueueOnBroker(session, PRIORITY_QUEUE_NAME, priorityQueueArguments); + + // Create a queue that has a DLQ + final Map<String,Object> queueWithDLQArguments = new HashMap<String, Object>(); + queueWithDLQArguments.put("x-qpid-dlq-enabled", true); + queueWithDLQArguments.put("x-qpid-maximum-delivery-count", 2); + createAndBindQueueOnBroker(session, QUEUE_WITH_DLQ_NAME, queueWithDLQArguments); + + // Send message to the DLQ + Queue dlq = session.createQueue("fanout://" + QUEUE_WITH_DLQ_NAME + "_DLE//does-not-matter"); + MessageProducer dlqMessageProducer = session.createProducer(dlq); + sendMessages(session, dlqMessageProducer, dlq, DeliveryMode.PERSISTENT, 1*1024, 1); + session.commit(); + // Create a queue with JMX specifying an owner, so it can later be moved into description + createAndBindQueueOnBrokerWithJMX(NONEXCLUSIVE_WITH_ERRONEOUS_OWNER, MISUSED_OWNER, priorityQueueArguments); session.close(); connection.close(); } + private void createAndBindQueueOnBroker(Session session, String queueName, final Map<String, Object> arguments) throws Exception + { + ((AMQSession<?,?>) session).createQueue(new AMQShortString(queueName), false, true, false, arguments); + Queue queue = (Queue) session.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='true'"); + ((AMQSession<?,?>) session).declareAndBind((AMQDestination)queue); + } + + private void createAndBindQueueOnBrokerWithJMX(String queueName, String owner, final Map<String, Object> arguments) throws Exception + { + Map<String, Object> environment = new HashMap<String, Object>(); + environment.put(JMXConnector.CREDENTIALS, new String[] {"admin","admin"}); + JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:8999/jmxrmi"); + JMXConnector jmxConnector = JMXConnectorFactory.connect(url, environment); + MBeanServerConnection mbsc = jmxConnector.getMBeanServerConnection(); + ObjectName virtualHost = new ObjectName("org.apache.qpid:type=VirtualHost.VirtualHostManager,VirtualHost=\"test\""); + + Object[] params = new Object[] {queueName, owner, true, arguments}; + String[] signature = new String[] {String.class.getName(), String.class.getName(), boolean.class.getName(), Map.class.getName()}; + mbsc.invoke(virtualHost, "createNewQueue", params, signature); + + ObjectName directExchange = new ObjectName("org.apache.qpid:type=VirtualHost.Exchange,VirtualHost=\"test\",name=\"amq.direct\",ExchangeType=direct"); + mbsc.invoke(directExchange, "createNewBinding", new Object[] {queueName, queueName}, new String[] {String.class.getName(), String.class.getName()}); + } /** * Prepare a DurableSubscription backing queue for use in testing selector * recovery and queue exclusivity marking during the upgrade process. |