diff options
author | Martin Ritchie <ritchiem@apache.org> | 2008-08-07 15:37:36 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2008-08-07 15:37:36 +0000 |
commit | c53cf3df85bb5a37ec7ec000667cba4012c35e85 (patch) | |
tree | 1eeb3f4c1c663a1d1831bf0b794488d98a237c78 | |
parent | 87317fb32beea5c78506afce0be739c2a90b098e (diff) | |
download | qpid-python-c53cf3df85bb5a37ec7ec000667cba4012c35e85.tar.gz |
QPID-1195 , QPID-1193 Initial changes to allow bind and queue arguments to be stored and recovered from the MessageStore. Created a test to validate that the stored values can be recovered. DerbyStore hasn't fully been implemented. Surrounding work has been done and tested with BDBMessageStore.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@683632 13f79535-47bb-0310-9956-ffa450edef68
16 files changed, 776 insertions, 66 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java b/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java index 88d5360f3e..6312aed5bf 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java @@ -180,7 +180,7 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr null); if (queue.isDurable() && !queue.isAutoDelete()) { - _messageStore.createQueue(queue); + _messageStore.createQueue(queue, null); } Configuration virtualHostDefaultQueueConfiguration = diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java index bd3e5b1f72..984106277f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java +++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java @@ -197,7 +197,7 @@ public class VirtualHostConfiguration if (queue.isDurable()) { - messageStore.createQueue(queue); + messageStore.createQueue(queue, null); } queueRegistry.registerQueue(queue); diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java index 616f47bd24..e39c005750 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java @@ -166,11 +166,19 @@ public class DirectExchange extends AbstractExchange assert routingKey != null; if (!_index.add(routingKey, queue)) { - _logger.debug("Queue " + queue + " is already registered with routing key " + routingKey); + if (_logger.isDebugEnabled()) + { + _logger.debug("Queue (" + queue.getName() + ")" + queue + " is already registered with routing key " + routingKey); + } } else { - _logger.debug("Binding queue " + queue + " with routing key " + routingKey + " to exchange " + this); + if (_logger.isDebugEnabled()) + { + _logger.debug("Binding queue(" + queue.getName() + ") " + queue + " with routing key " + routingKey + + (args == null ? "" : " and arguments " + args.toString()) + + " to exchange " + this); + } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java index 379ec7a7d6..447482ccf3 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java @@ -117,7 +117,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar queue = createQueue(queueName, body, virtualHost, session); if (queue.isDurable() && !queue.isAutoDelete()) { - store.createQueue(queue); + store.createQueue(queue, body.getArguments()); } queueRegistry.registerQueue(queue); if (autoRegister) diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java index ba6b392d13..e14ed0f41d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java @@ -39,6 +39,11 @@ public class AMQPriorityQueue extends SimpleAMQQueue super(name, durable, owner, autoDelete, virtualHost, new PriorityQueueList.Factory(priorities)); } + public int getPriorities() + { + return ((PriorityQueueList) _entries).getPriorities(); + } + @Override protected void checkSubscriptionsNotAheadOfDelivery(final QueueEntry entry) { @@ -63,5 +68,4 @@ public class AMQPriorityQueue extends SimpleAMQQueue } } - } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index f3e4e7c28b..f7bc2ddafa 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -52,6 +52,7 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue> void unBind(Exchange exchange, AMQShortString routingKey, FieldTable arguments) throws AMQException; + List<ExchangeBinding> getExchangeBindings(); void registerSubscription(final Subscription subscription, final boolean exclusive) throws AMQException; diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java index 431b76754f..9dfc4449bb 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java @@ -28,7 +28,7 @@ import org.apache.qpid.AMQException; public class AMQQueueFactory { - private static final AMQShortString X_QPID_PRIORITIES = new AMQShortString("x-qpid-priorities"); + public static final AMQShortString X_QPID_PRIORITIES = new AMQShortString("x-qpid-priorities"); public static AMQQueue createAMQQueueImpl(AMQShortString name, boolean durable, diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBinding.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBinding.java new file mode 100644 index 0000000000..a2fcab9e73 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBinding.java @@ -0,0 +1,84 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.AMQException; + +public class ExchangeBinding +{ + private final Exchange _exchange; + private final AMQShortString _routingKey; + private final FieldTable _arguments; + + private static final FieldTable EMPTY_ARGUMENTS = new FieldTable(); + + ExchangeBinding(AMQShortString routingKey, Exchange exchange) + { + this(routingKey, exchange, EMPTY_ARGUMENTS); + } + + ExchangeBinding(AMQShortString routingKey, Exchange exchange, FieldTable arguments) + { + _routingKey = routingKey == null ? AMQShortString.EMPTY_STRING : routingKey; + _exchange = exchange; + _arguments = arguments == null ? EMPTY_ARGUMENTS : arguments; + } + + void unbind(AMQQueue queue) throws AMQException + { + _exchange.deregisterQueue(_routingKey, queue, _arguments); + } + + public Exchange getExchange() + { + return _exchange; + } + + public AMQShortString getRoutingKey() + { + return _routingKey; + } + + public FieldTable getArguments() + { + return _arguments; + } + + public int hashCode() + { + return (_exchange == null ? 0 : _exchange.hashCode()) + + (_routingKey == null ? 0 : _routingKey.hashCode()); + } + + public boolean equals(Object o) + { + if (!(o instanceof ExchangeBinding)) + { + return false; + } + ExchangeBinding eb = (ExchangeBinding) o; + return _exchange.equals(eb._exchange) + && _routingKey.equals(eb._routingKey); + } +}
\ No newline at end of file diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java index d2e5a02508..fb839c1783 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java @@ -36,59 +36,6 @@ import org.apache.qpid.server.exchange.Exchange; */ class ExchangeBindings { - private static final FieldTable EMPTY_ARGUMENTS = new FieldTable(); - - static class ExchangeBinding - { - private final Exchange _exchange; - private final AMQShortString _routingKey; - private final FieldTable _arguments; - - ExchangeBinding(AMQShortString routingKey, Exchange exchange) - { - this(routingKey, exchange, EMPTY_ARGUMENTS); - } - - ExchangeBinding(AMQShortString routingKey, Exchange exchange, FieldTable arguments) - { - _routingKey = routingKey == null ? AMQShortString.EMPTY_STRING : routingKey; - _exchange = exchange; - _arguments = arguments == null ? EMPTY_ARGUMENTS : arguments; - } - - void unbind(AMQQueue queue) throws AMQException - { - _exchange.deregisterQueue(_routingKey, queue, _arguments); - } - - public Exchange getExchange() - { - return _exchange; - } - - public AMQShortString getRoutingKey() - { - return _routingKey; - } - - public int hashCode() - { - return (_exchange == null ? 0 : _exchange.hashCode()) - + (_routingKey == null ? 0 : _routingKey.hashCode()); - } - - public boolean equals(Object o) - { - if (!(o instanceof ExchangeBinding)) - { - return false; - } - ExchangeBinding eb = (ExchangeBinding) o; - return _exchange.equals(eb._exchange) - && _routingKey.equals(eb._routingKey); - } - } - private final List<ExchangeBinding> _bindings = new CopyOnWriteArrayList<ExchangeBinding>(); private final AMQQueue _queue; diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java index e6628832cb..fd46a8a5ff 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java @@ -42,6 +42,11 @@ public class PriorityQueueList implements QueueEntryList } } + public int getPriorities() + { + return _priorities; + } + public AMQQueue getQueue() { return _queue; diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 4b7da30800..1674c26232 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -82,7 +82,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener private volatile Subscription _exclusiveSubscriber; - private final QueueEntryList _entries; + protected final QueueEntryList _entries; private final AMQQueueMBean _managedObject; private final Executor _asyncDelivery; @@ -223,6 +223,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } } + public List<ExchangeBinding> getExchangeBindings() + { + return new ArrayList<ExchangeBinding>(_bindings.getExchangeBindings()); + } + // ------ Manage Subscriptions public synchronized void registerSubscription(final Subscription subscription, final boolean exclusive) throws AMQException diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java index 9d22e2b929..bfbba8c00f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java @@ -728,7 +728,7 @@ public class DerbyMessageStore implements MessageStore } - public void createQueue(AMQQueue queue) throws AMQException + public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException { _logger.debug("public void createQueue(AMQQueue queue = " + queue + "): called"); @@ -1281,6 +1281,11 @@ public class DerbyMessageStore implements MessageStore } + public boolean isPersistent() + { + return true; + } + private void checkNotClosed() throws MessageStoreClosedException { if (_closed.get()) diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java index b02eff957e..f8d8404b89 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java @@ -121,7 +121,7 @@ public class MemoryMessageStore implements MessageStore } - public void createQueue(AMQQueue queue) throws AMQException + public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException { // Not required to do anything } @@ -213,7 +213,12 @@ public class MemoryMessageStore implements MessageStore return bodyList.get(index); } - private void checkNotClosed() throws MessageStoreClosedException + public boolean isPersistent() + { + return false; + } + + private void checkNotClosed() throws MessageStoreClosedException { if (_closed.get()) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java index e15e69a414..9e855bcc09 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java @@ -131,9 +131,10 @@ public interface MessageStore * * @param queue The queue to store. * + * @param arguments The additional arguments to the binding * @throws AMQException If the operation fails for any reason. */ - void createQueue(AMQQueue queue) throws AMQException; + void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException; /** * Removes the specified queue from the persistent store. @@ -255,4 +256,12 @@ public interface MessageStore * @throws AMQException If the operation fails for any reason, or if the specified message does not exist. */ ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException; + + /** + * Is this store capable of persisting the data + * + * @return true if this store is capable of persisting data + */ + boolean isPersistent(); + } diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java b/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java new file mode 100644 index 0000000000..932486d954 --- /dev/null +++ b/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java @@ -0,0 +1,632 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import junit.framework.TestCase; +import org.apache.qpid.server.exchange.DirectExchange; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.exchange.ExchangeType; +import org.apache.qpid.server.exchange.TopicExchange; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.AMQQueueFactory; +import org.apache.qpid.server.queue.IncomingMessage; +import org.apache.qpid.server.queue.MessageHandleFactory; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.queue.AMQPriorityQueue; +import org.apache.qpid.server.queue.SimpleAMQQueue; +import org.apache.qpid.server.queue.ExchangeBinding; +import org.apache.qpid.server.txn.NonTransactionalContext; +import org.apache.qpid.server.protocol.InternalTestProtocolSession; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.amqp_8_0.BasicConsumeBodyImpl; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.AMQException; +import org.apache.qpid.common.AMQPFilterTypes; +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.PropertiesConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.List; + +/** + * This tests the MessageStores by using the available interfaces. + * + * This test validates that Exchanges, Queues, Bindings and Messages are persisted correctly. + */ +public class MessageStoreTest extends TestCase +{ + + private static final int DEFAULT_PRIORTY_LEVEL = 5; + private static final Logger _logger = LoggerFactory.getLogger(MessageStoreTest.class); + + public void testMemoryMessageStore() + { + + PropertiesConfiguration config = new PropertiesConfiguration(); + + config.addProperty("store.class", "org.apache.qpid.server.store.MemoryMessageStore"); + + runTestWithStore(config); + } + + public void DISABLE_testDerbyMessageStore() + { + PropertiesConfiguration config = new PropertiesConfiguration(); + + config.addProperty("store.environment-path", "derbyDB_MST"); + config.addProperty("store.class", "org.apache.qpid.server.store.DerbyMessageStore"); + + runTestWithStore(config); + } + + private void reload(Configuration configuration) + { + if (_virtualHost != null) + { + try + { + _virtualHost.close(); + } + catch (Exception e) + { + fail(e.getMessage()); + } + } + + try + { + _virtualHost = new VirtualHost(virtualHostName, configuration, null); + } + catch (Exception e) + { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + VirtualHost _virtualHost = null; + String virtualHostName = "MessageStoreTest"; + + AMQShortString nonDurableExchangeName = new AMQShortString("MST-NonDurableDirectExchange"); + AMQShortString directExchangeName = new AMQShortString("MST-DirectExchange"); + AMQShortString topicExchangeName = new AMQShortString("MST-TopicExchange"); + AMQShortString queueOwner = new AMQShortString("MST"); + + AMQShortString durablePriorityTopicQueueName = new AMQShortString("MST-PriorityTopicQueue-Durable"); + AMQShortString durableTopicQueueName = new AMQShortString("MST-TopicQueue-Durable"); + AMQShortString priorityTopicQueueName = new AMQShortString("MST-PriorityTopicQueue"); + AMQShortString topicQueueName = new AMQShortString("MST-TopicQueue"); + + AMQShortString durablePriorityQueueName = new AMQShortString("MST-PriorityQueue-Durable"); + AMQShortString durableQueueName = new AMQShortString("MST-Queue-Durable"); + AMQShortString priorityQueueName = new AMQShortString("MST-PriorityQueue"); + AMQShortString queueName = new AMQShortString("MST-Queue"); + + AMQShortString directRouting = new AMQShortString("MST-direct"); + AMQShortString topicRouting = new AMQShortString("MST-topic"); + + protected void runTestWithStore(Configuration configuration) + { + //Ensure Environment Path is empty + cleanup(configuration); + + //Load the Virtualhost with the required MessageStore + reload(configuration); + + MessageStore messageStore = _virtualHost.getMessageStore(); + + createAllQueues(); + createAllTopicQueues(); + + //Register Non-Durable DirectExchange + Exchange nonDurableExchange = createExchange(DirectExchange.TYPE, nonDurableExchangeName, false); + bindAllQueuesToExchange(nonDurableExchange, directRouting); + + //Register DirectExchange + Exchange directExchange = createExchange(DirectExchange.TYPE, directExchangeName, true); + bindAllQueuesToExchange(directExchange, directRouting); + + //Register TopicExchange + Exchange topicExchange = createExchange(TopicExchange.TYPE, topicExchangeName, true); + bindAllTopicQueuesToExchange(topicExchange, topicRouting); + + //Send Message To NonDurable direct Exchange = persistent + sendMessageOnExchange(nonDurableExchange, directRouting, true); + // and non-persistent + sendMessageOnExchange(nonDurableExchange, directRouting, false); + + //Send Message To direct Exchange = persistent + sendMessageOnExchange(directExchange, directRouting, true); + // and non-persistent + sendMessageOnExchange(directExchange, directRouting, false); + + //Send Message To topic Exchange = persistent + sendMessageOnExchange(topicExchange, topicRouting, true); + // and non-persistent + sendMessageOnExchange(topicExchange, topicRouting, false); + + //Ensure all the Queues have four messages (one transient, one persistent) x 2 exchange routings + validateMessageOnQueues(4, true); + //Ensure all the topics have two messages (one transient, one persistent) + validateMessageOnTopics(2, true); + + assertEquals("Not all queues correctly registered", 8, _virtualHost.getQueueRegistry().getQueues().size()); + + if (!messageStore.isPersistent()) + { + _logger.warn("Unable to test Persistent capabilities of messages store(" + messageStore.getClass() + ") as it is not capable of peristence."); + return; + } + + //Reload the Virtualhost to test persistence + _logger.info("Reloading Virtualhost"); + + VirtualHost original = _virtualHost; + + reload(configuration); + + assertTrue("Virtualhost has not been reloaded", original != _virtualHost); + + validateExchanges(); + + //Validate Durable Queues still have the persistentn message + validateMessageOnQueues(2, false); + //Validate Durable Queues still have the persistentn message + validateMessageOnTopics(1, false); + + //Validate Properties of Binding + validateBindingProperties(); + + //Validate Properties of Queues + validateQueueProperties(); + + //Validate Non-Durable Queues are gone. + assertNull("Non-Durable queue still registered:" + priorityQueueName, _virtualHost.getQueueRegistry().getQueue(priorityQueueName)); + assertNull("Non-Durable queue still registered:" + queueName, _virtualHost.getQueueRegistry().getQueue(queueName)); + assertNull("Non-Durable queue still registered:" + priorityTopicQueueName, _virtualHost.getQueueRegistry().getQueue(priorityTopicQueueName)); + assertNull("Non-Durable queue still registered:" + topicQueueName, _virtualHost.getQueueRegistry().getQueue(topicQueueName)); + + assertEquals("Not all queues correctly registered", 4, _virtualHost.getQueueRegistry().getQueues().size()); + } + + private void validateExchanges() + { + ExchangeRegistry registry = _virtualHost.getExchangeRegistry(); + + assertTrue(directExchangeName + " exchange NOT reloaded after failover", + registry.getExchangeNames().contains(directExchangeName)); + assertTrue(topicExchangeName + " exchange NOT reloaded after failover", + registry.getExchangeNames().contains(topicExchangeName)); + assertTrue(nonDurableExchangeName + " exchange reloaded after failover", + !registry.getExchangeNames().contains(nonDurableExchangeName)); + + // There are 5 required exchanges + our 2 durable queues + assertEquals("Incorrect number of exchanges available", 5 + 2, registry.getExchangeNames().size()); + } + + /** Validates that the Durable queues */ + private void validateBindingProperties() + { + QueueRegistry queueRegistry = _virtualHost.getQueueRegistry(); + + validateBindingProperties(queueRegistry.getQueue(durablePriorityQueueName).getExchangeBindings(), false); + validateBindingProperties(queueRegistry.getQueue(durablePriorityTopicQueueName).getExchangeBindings(), true); + validateBindingProperties(queueRegistry.getQueue(durableQueueName).getExchangeBindings(), false); + validateBindingProperties(queueRegistry.getQueue(durableTopicQueueName).getExchangeBindings(), true); + } + + /** + * Validate that each queue is bound once. + * + * @param bindings the set of bindings to validate + * @param useSelectors if set validate that the binding has a JMS_SELECTOR argument + */ + private void validateBindingProperties(List<ExchangeBinding> bindings, boolean useSelectors) + { + assertEquals("Each queue should only be bound once.", 1, bindings.size()); + + ExchangeBinding binding = bindings.get(0); + + if (useSelectors) + { + assertTrue("Binding does not contain a Selector argument.", + binding.getArguments().containsKey(AMQPFilterTypes.JMS_SELECTOR.getValue())); + } + } + + private void validateQueueProperties() + { + QueueRegistry queueRegistry = _virtualHost.getQueueRegistry(); + + validateQueueProperties(queueRegistry.getQueue(durablePriorityQueueName), true); + validateQueueProperties(queueRegistry.getQueue(durablePriorityTopicQueueName), true); + validateQueueProperties(queueRegistry.getQueue(durableQueueName), false); + validateQueueProperties(queueRegistry.getQueue(durableTopicQueueName), false); + + } + + private void validateQueueProperties(AMQQueue queue, boolean usePriority) + { + if (usePriority) + { + assertEquals("Queue is no longer a Priority Queue", AMQPriorityQueue.class, queue.getClass()); + assertEquals("Priority Queue does not have set priorities", DEFAULT_PRIORTY_LEVEL, ((AMQPriorityQueue) queue).getPriorities()); + } + else + { + assertEquals("Queue is no longer a Priority Queue", SimpleAMQQueue.class, queue.getClass()); + } + } + + /** + * Delete the Store Environment path + * + * @param configuration The configuration that contains the store environment path. + */ + private void cleanup(Configuration configuration) + { + + String environment = configuration.getString("store.environment-path"); + + if (environment != null) + { + File environmentPath = new File(environment); + + if (environmentPath.exists()) + { + deleteDirectory(environmentPath); + } + } + } + + private void deleteDirectory(File path) + { + if (path.isDirectory()) + { + for (File file : path.listFiles()) + { + deleteDirectory(file); + } + } + else + { + path.delete(); + } + } + + private void sendMessageOnExchange(Exchange directExchange, AMQShortString routingKey, boolean deliveryMode) + { + //Set MessagePersustebce + BasicContentHeaderProperties properties = new BasicContentHeaderProperties(); + properties.setDeliveryMode(deliveryMode ? Integer.valueOf(2).byteValue() : Integer.valueOf(1).byteValue()); + FieldTable headers = properties.getHeaders(); + headers.setString("Test", "MST"); + properties.setHeaders(headers); + + MessagePublishInfo messageInfo = new TestMessagePublishInfo(directExchange, false, false, routingKey); + + IncomingMessage currentMessage = null; + + try + { + currentMessage = new IncomingMessage(_virtualHost.getMessageStore().getNewMessageId(), + messageInfo, + new NonTransactionalContext(_virtualHost.getMessageStore(), + new StoreContext(), null, null), + new InternalTestProtocolSession()); + } + catch (AMQException e) + { + fail(e.getMessage()); + } + + currentMessage.setMessageStore(_virtualHost.getMessageStore()); + currentMessage.setExchange(directExchange); + + ContentHeaderBody headerBody = new ContentHeaderBody(); + headerBody.classId = BasicConsumeBodyImpl.CLASS_ID; + headerBody.bodySize = 0; + + headerBody.properties = properties; + + try + { + currentMessage.setContentHeaderBody(headerBody); + } + catch (AMQException e) + { + fail(e.getMessage()); + } + + currentMessage.setExpiration(); + + try + { + currentMessage.route(); + } + catch (AMQException e) + { + fail(e.getMessage()); + } + + try + { + currentMessage.routingComplete(_virtualHost.getMessageStore(), new MessageHandleFactory()); + } + catch (AMQException e) + { + fail(e.getMessage()); + } + + // check and deliver if header says body length is zero + if (currentMessage.allContentReceived()) + { + try + { + currentMessage.deliverToQueues(); + } + catch (AMQException e) + { + fail(e.getMessage()); + } + } + } + + private void createAllQueues() + { + //Register Durable Priority Queue + createQueue(durablePriorityQueueName, true, true); + + //Register Durable Simple Queue + createQueue(durableQueueName, false, true); + + //Register NON-Durable Priority Queue + createQueue(priorityQueueName, true, false); + + //Register NON-Durable Simple Queue + createQueue(queueName, false, false); + } + + private void createAllTopicQueues() + { + //Register Durable Priority Queue + createQueue(durablePriorityTopicQueueName, true, true); + + //Register Durable Simple Queue + createQueue(durableTopicQueueName, false, true); + + //Register NON-Durable Priority Queue + createQueue(priorityTopicQueueName, true, false); + + //Register NON-Durable Simple Queue + createQueue(topicQueueName, false, false); + } + + private Exchange createExchange(ExchangeType type, AMQShortString name, boolean durable) + { + Exchange exchange = null; + + try + { + exchange = type.newInstance(_virtualHost, name, durable, 0, false); + } + catch (AMQException e) + { + fail(e.getMessage()); + } + + try + { + _virtualHost.getExchangeRegistry().registerExchange(exchange); + } + catch (AMQException e) + { + fail(e.getMessage()); + } + return exchange; + } + + private void createQueue(AMQShortString queueName, boolean usePriority, boolean durable) + { + + FieldTable queueArguments = null; + + if (usePriority) + { + queueArguments = new FieldTable(); + queueArguments.put(AMQQueueFactory.X_QPID_PRIORITIES, DEFAULT_PRIORTY_LEVEL); + } + + AMQQueue queue = null; + + //Ideally we would be able to use the QueueDeclareHandler here. + try + { + queue = AMQQueueFactory.createAMQQueueImpl(queueName, durable, queueOwner, false, _virtualHost, + queueArguments); + + validateQueueProperties(queue, usePriority); + + if (queue.isDurable() && !queue.isAutoDelete()) + { + _virtualHost.getMessageStore().createQueue(queue, queueArguments); + } + } + catch (AMQException e) + { + fail(e.getMessage()); + } + + try + { + _virtualHost.getQueueRegistry().registerQueue(queue); + } + catch (AMQException e) + { + fail(e.getMessage()); + } + + } + + private void bindAllQueuesToExchange(Exchange exchange, AMQShortString routingKey) + { + FieldTable queueArguments = new FieldTable(); + queueArguments.put(AMQQueueFactory.X_QPID_PRIORITIES, DEFAULT_PRIORTY_LEVEL); + + QueueRegistry queueRegistry = _virtualHost.getQueueRegistry(); + + bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(durablePriorityQueueName), false, queueArguments); + bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(durableQueueName), false, null); + bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(priorityQueueName), false, queueArguments); + bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(queueName), false, null); + } + + private void bindAllTopicQueuesToExchange(Exchange exchange, AMQShortString routingKey) + { + FieldTable queueArguments = new FieldTable(); + queueArguments.put(AMQQueueFactory.X_QPID_PRIORITIES, DEFAULT_PRIORTY_LEVEL); + + QueueRegistry queueRegistry = _virtualHost.getQueueRegistry(); + + bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(durablePriorityTopicQueueName), true, queueArguments); + bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(durableTopicQueueName), true, null); + bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(priorityTopicQueueName), true, queueArguments); + bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(topicQueueName), true, null); + } + + + protected void bindQueueToExchange(Exchange exchange, AMQShortString routingKey, AMQQueue queue, boolean useSelector, FieldTable queueArguments) + { + try + { + exchange.registerQueue(queueName, queue, queueArguments); + } + catch (AMQException e) + { + fail(e.getMessage()); + } + + FieldTable bindArguments = null; + + if (useSelector) + { + bindArguments = new FieldTable(); + bindArguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), "Test = 'MST'"); + } + + try + { + queue.bind(exchange, routingKey, bindArguments); + } + catch (AMQException e) + { + fail(e.getMessage()); + } + } + + private void validateMessage(long messageCount, boolean allQueues) + { + validateMessageOnTopics(messageCount, allQueues); + validateMessageOnQueues(messageCount, allQueues); + } + + private void validateMessageOnTopics(long messageCount, boolean allQueues) + { + validateMessageOnQueue(durablePriorityTopicQueueName, messageCount); + validateMessageOnQueue(durableTopicQueueName, messageCount); + + if (allQueues) + { + validateMessageOnQueue(priorityTopicQueueName, messageCount); + validateMessageOnQueue(topicQueueName, messageCount); + } + } + + private void validateMessageOnQueues(long messageCount, boolean allQueues) + { + validateMessageOnQueue(durablePriorityQueueName, messageCount); + validateMessageOnQueue(durableQueueName, messageCount); + + if (allQueues) + { + validateMessageOnQueue(priorityQueueName, messageCount); + validateMessageOnQueue(queueName, messageCount); + } + } + + private void validateMessageOnQueue(AMQShortString queueName, long messageCount) + { + AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(queueName); + + assertNotNull("Queue(" + queueName + ") not correctly registered:", queue); + + assertEquals("Incorrect Message count on queue:" + queueName, messageCount, queue.getMessageCount()); + } + + private class TestMessagePublishInfo implements MessagePublishInfo + { + + Exchange _exchange; + boolean _immediate; + boolean _mandatory; + AMQShortString _routingKey; + + TestMessagePublishInfo(Exchange exchange, boolean immediate, boolean mandatory, AMQShortString routingKey) + { + _exchange = exchange; + _immediate = immediate; + _mandatory = mandatory; + _routingKey = routingKey; + } + + public AMQShortString getExchange() + { + return _exchange.getName(); + } + + public void setExchange(AMQShortString exchange) + { + //no-op + } + + public boolean isImmediate() + { + return _immediate; + } + + public boolean isMandatory() + { + return _mandatory; + } + + public AMQShortString getRoutingKey() + { + return _routingKey; + } + } +}
\ No newline at end of file diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java b/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java index 792744903e..e2be503926 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java +++ b/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java @@ -78,7 +78,7 @@ public class SkeletonMessageStore implements MessageStore //To change body of implemented methods use File | Settings | File Templates. } - public void createQueue(AMQQueue queue) throws AMQException + public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException { } @@ -129,6 +129,11 @@ public class SkeletonMessageStore implements MessageStore return null; } + public boolean isPersistent() + { + return false; + } + public void removeQueue(final AMQQueue queue) throws AMQException { |