summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2008-08-07 15:37:36 +0000
committerMartin Ritchie <ritchiem@apache.org>2008-08-07 15:37:36 +0000
commitc53cf3df85bb5a37ec7ec000667cba4012c35e85 (patch)
tree1eeb3f4c1c663a1d1831bf0b794488d98a237c78
parent87317fb32beea5c78506afce0be739c2a90b098e (diff)
downloadqpid-python-c53cf3df85bb5a37ec7ec000667cba4012c35e85.tar.gz
QPID-1195 , QPID-1193 Initial changes to allow bind and queue arguments to be stored and recovered from the MessageStore. Created a test to validate that the stored values can be recovered. DerbyStore hasn't fully been implemented. Surrounding work has been done and tested with BDBMessageStore.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@683632 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java12
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java1
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBinding.java84
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java53
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java5
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java7
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java7
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java9
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java11
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java632
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java7
16 files changed, 776 insertions, 66 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java b/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
index 88d5360f3e..6312aed5bf 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
@@ -180,7 +180,7 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr
null);
if (queue.isDurable() && !queue.isAutoDelete())
{
- _messageStore.createQueue(queue);
+ _messageStore.createQueue(queue, null);
}
Configuration virtualHostDefaultQueueConfiguration =
diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
index bd3e5b1f72..984106277f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
@@ -197,7 +197,7 @@ public class VirtualHostConfiguration
if (queue.isDurable())
{
- messageStore.createQueue(queue);
+ messageStore.createQueue(queue, null);
}
queueRegistry.registerQueue(queue);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
index 616f47bd24..e39c005750 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
@@ -166,11 +166,19 @@ public class DirectExchange extends AbstractExchange
assert routingKey != null;
if (!_index.add(routingKey, queue))
{
- _logger.debug("Queue " + queue + " is already registered with routing key " + routingKey);
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Queue (" + queue.getName() + ")" + queue + " is already registered with routing key " + routingKey);
+ }
}
else
{
- _logger.debug("Binding queue " + queue + " with routing key " + routingKey + " to exchange " + this);
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Binding queue(" + queue.getName() + ") " + queue + " with routing key " + routingKey
+ + (args == null ? "" : " and arguments " + args.toString())
+ + " to exchange " + this);
+ }
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
index 379ec7a7d6..447482ccf3 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
@@ -117,7 +117,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
queue = createQueue(queueName, body, virtualHost, session);
if (queue.isDurable() && !queue.isAutoDelete())
{
- store.createQueue(queue);
+ store.createQueue(queue, body.getArguments());
}
queueRegistry.registerQueue(queue);
if (autoRegister)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
index ba6b392d13..e14ed0f41d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
@@ -39,6 +39,11 @@ public class AMQPriorityQueue extends SimpleAMQQueue
super(name, durable, owner, autoDelete, virtualHost, new PriorityQueueList.Factory(priorities));
}
+ public int getPriorities()
+ {
+ return ((PriorityQueueList) _entries).getPriorities();
+ }
+
@Override
protected void checkSubscriptionsNotAheadOfDelivery(final QueueEntry entry)
{
@@ -63,5 +68,4 @@ public class AMQPriorityQueue extends SimpleAMQQueue
}
}
-
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index f3e4e7c28b..f7bc2ddafa 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -52,6 +52,7 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue>
void unBind(Exchange exchange, AMQShortString routingKey, FieldTable arguments) throws AMQException;
+ List<ExchangeBinding> getExchangeBindings();
void registerSubscription(final Subscription subscription, final boolean exclusive) throws AMQException;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
index 431b76754f..9dfc4449bb 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
@@ -28,7 +28,7 @@ import org.apache.qpid.AMQException;
public class AMQQueueFactory
{
- private static final AMQShortString X_QPID_PRIORITIES = new AMQShortString("x-qpid-priorities");
+ public static final AMQShortString X_QPID_PRIORITIES = new AMQShortString("x-qpid-priorities");
public static AMQQueue createAMQQueueImpl(AMQShortString name,
boolean durable,
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBinding.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBinding.java
new file mode 100644
index 0000000000..a2fcab9e73
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBinding.java
@@ -0,0 +1,84 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.AMQException;
+
+public class ExchangeBinding
+{
+ private final Exchange _exchange;
+ private final AMQShortString _routingKey;
+ private final FieldTable _arguments;
+
+ private static final FieldTable EMPTY_ARGUMENTS = new FieldTable();
+
+ ExchangeBinding(AMQShortString routingKey, Exchange exchange)
+ {
+ this(routingKey, exchange, EMPTY_ARGUMENTS);
+ }
+
+ ExchangeBinding(AMQShortString routingKey, Exchange exchange, FieldTable arguments)
+ {
+ _routingKey = routingKey == null ? AMQShortString.EMPTY_STRING : routingKey;
+ _exchange = exchange;
+ _arguments = arguments == null ? EMPTY_ARGUMENTS : arguments;
+ }
+
+ void unbind(AMQQueue queue) throws AMQException
+ {
+ _exchange.deregisterQueue(_routingKey, queue, _arguments);
+ }
+
+ public Exchange getExchange()
+ {
+ return _exchange;
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return _routingKey;
+ }
+
+ public FieldTable getArguments()
+ {
+ return _arguments;
+ }
+
+ public int hashCode()
+ {
+ return (_exchange == null ? 0 : _exchange.hashCode())
+ + (_routingKey == null ? 0 : _routingKey.hashCode());
+ }
+
+ public boolean equals(Object o)
+ {
+ if (!(o instanceof ExchangeBinding))
+ {
+ return false;
+ }
+ ExchangeBinding eb = (ExchangeBinding) o;
+ return _exchange.equals(eb._exchange)
+ && _routingKey.equals(eb._routingKey);
+ }
+} \ No newline at end of file
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java
index d2e5a02508..fb839c1783 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java
@@ -36,59 +36,6 @@ import org.apache.qpid.server.exchange.Exchange;
*/
class ExchangeBindings
{
- private static final FieldTable EMPTY_ARGUMENTS = new FieldTable();
-
- static class ExchangeBinding
- {
- private final Exchange _exchange;
- private final AMQShortString _routingKey;
- private final FieldTable _arguments;
-
- ExchangeBinding(AMQShortString routingKey, Exchange exchange)
- {
- this(routingKey, exchange, EMPTY_ARGUMENTS);
- }
-
- ExchangeBinding(AMQShortString routingKey, Exchange exchange, FieldTable arguments)
- {
- _routingKey = routingKey == null ? AMQShortString.EMPTY_STRING : routingKey;
- _exchange = exchange;
- _arguments = arguments == null ? EMPTY_ARGUMENTS : arguments;
- }
-
- void unbind(AMQQueue queue) throws AMQException
- {
- _exchange.deregisterQueue(_routingKey, queue, _arguments);
- }
-
- public Exchange getExchange()
- {
- return _exchange;
- }
-
- public AMQShortString getRoutingKey()
- {
- return _routingKey;
- }
-
- public int hashCode()
- {
- return (_exchange == null ? 0 : _exchange.hashCode())
- + (_routingKey == null ? 0 : _routingKey.hashCode());
- }
-
- public boolean equals(Object o)
- {
- if (!(o instanceof ExchangeBinding))
- {
- return false;
- }
- ExchangeBinding eb = (ExchangeBinding) o;
- return _exchange.equals(eb._exchange)
- && _routingKey.equals(eb._routingKey);
- }
- }
-
private final List<ExchangeBinding> _bindings = new CopyOnWriteArrayList<ExchangeBinding>();
private final AMQQueue _queue;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
index e6628832cb..fd46a8a5ff 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
@@ -42,6 +42,11 @@ public class PriorityQueueList implements QueueEntryList
}
}
+ public int getPriorities()
+ {
+ return _priorities;
+ }
+
public AMQQueue getQueue()
{
return _queue;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index 4b7da30800..1674c26232 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -82,7 +82,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
private volatile Subscription _exclusiveSubscriber;
- private final QueueEntryList _entries;
+ protected final QueueEntryList _entries;
private final AMQQueueMBean _managedObject;
private final Executor _asyncDelivery;
@@ -223,6 +223,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
}
+ public List<ExchangeBinding> getExchangeBindings()
+ {
+ return new ArrayList<ExchangeBinding>(_bindings.getExchangeBindings());
+ }
+
// ------ Manage Subscriptions
public synchronized void registerSubscription(final Subscription subscription, final boolean exclusive) throws AMQException
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
index 9d22e2b929..bfbba8c00f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
@@ -728,7 +728,7 @@ public class DerbyMessageStore implements MessageStore
}
- public void createQueue(AMQQueue queue) throws AMQException
+ public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException
{
_logger.debug("public void createQueue(AMQQueue queue = " + queue + "): called");
@@ -1281,6 +1281,11 @@ public class DerbyMessageStore implements MessageStore
}
+ public boolean isPersistent()
+ {
+ return true;
+ }
+
private void checkNotClosed() throws MessageStoreClosedException
{
if (_closed.get())
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
index b02eff957e..f8d8404b89 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
@@ -121,7 +121,7 @@ public class MemoryMessageStore implements MessageStore
}
- public void createQueue(AMQQueue queue) throws AMQException
+ public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException
{
// Not required to do anything
}
@@ -213,7 +213,12 @@ public class MemoryMessageStore implements MessageStore
return bodyList.get(index);
}
- private void checkNotClosed() throws MessageStoreClosedException
+ public boolean isPersistent()
+ {
+ return false;
+ }
+
+ private void checkNotClosed() throws MessageStoreClosedException
{
if (_closed.get())
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
index e15e69a414..9e855bcc09 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
@@ -131,9 +131,10 @@ public interface MessageStore
*
* @param queue The queue to store.
*
+ * @param arguments The additional arguments to the binding
* @throws AMQException If the operation fails for any reason.
*/
- void createQueue(AMQQueue queue) throws AMQException;
+ void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException;
/**
* Removes the specified queue from the persistent store.
@@ -255,4 +256,12 @@ public interface MessageStore
* @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
*/
ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException;
+
+ /**
+ * Is this store capable of persisting the data
+ *
+ * @return true if this store is capable of persisting data
+ */
+ boolean isPersistent();
+
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java b/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
new file mode 100644
index 0000000000..932486d954
--- /dev/null
+++ b/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
@@ -0,0 +1,632 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import junit.framework.TestCase;
+import org.apache.qpid.server.exchange.DirectExchange;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.exchange.ExchangeType;
+import org.apache.qpid.server.exchange.TopicExchange;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.queue.IncomingMessage;
+import org.apache.qpid.server.queue.MessageHandleFactory;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.AMQPriorityQueue;
+import org.apache.qpid.server.queue.SimpleAMQQueue;
+import org.apache.qpid.server.queue.ExchangeBinding;
+import org.apache.qpid.server.txn.NonTransactionalContext;
+import org.apache.qpid.server.protocol.InternalTestProtocolSession;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.amqp_8_0.BasicConsumeBodyImpl;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.common.AMQPFilterTypes;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.List;
+
+/**
+ * This tests the MessageStores by using the available interfaces.
+ *
+ * This test validates that Exchanges, Queues, Bindings and Messages are persisted correctly.
+ */
+public class MessageStoreTest extends TestCase
+{
+
+ private static final int DEFAULT_PRIORTY_LEVEL = 5;
+ private static final Logger _logger = LoggerFactory.getLogger(MessageStoreTest.class);
+
+ public void testMemoryMessageStore()
+ {
+
+ PropertiesConfiguration config = new PropertiesConfiguration();
+
+ config.addProperty("store.class", "org.apache.qpid.server.store.MemoryMessageStore");
+
+ runTestWithStore(config);
+ }
+
+ public void DISABLE_testDerbyMessageStore()
+ {
+ PropertiesConfiguration config = new PropertiesConfiguration();
+
+ config.addProperty("store.environment-path", "derbyDB_MST");
+ config.addProperty("store.class", "org.apache.qpid.server.store.DerbyMessageStore");
+
+ runTestWithStore(config);
+ }
+
+ private void reload(Configuration configuration)
+ {
+ if (_virtualHost != null)
+ {
+ try
+ {
+ _virtualHost.close();
+ }
+ catch (Exception e)
+ {
+ fail(e.getMessage());
+ }
+ }
+
+ try
+ {
+ _virtualHost = new VirtualHost(virtualHostName, configuration, null);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ VirtualHost _virtualHost = null;
+ String virtualHostName = "MessageStoreTest";
+
+ AMQShortString nonDurableExchangeName = new AMQShortString("MST-NonDurableDirectExchange");
+ AMQShortString directExchangeName = new AMQShortString("MST-DirectExchange");
+ AMQShortString topicExchangeName = new AMQShortString("MST-TopicExchange");
+ AMQShortString queueOwner = new AMQShortString("MST");
+
+ AMQShortString durablePriorityTopicQueueName = new AMQShortString("MST-PriorityTopicQueue-Durable");
+ AMQShortString durableTopicQueueName = new AMQShortString("MST-TopicQueue-Durable");
+ AMQShortString priorityTopicQueueName = new AMQShortString("MST-PriorityTopicQueue");
+ AMQShortString topicQueueName = new AMQShortString("MST-TopicQueue");
+
+ AMQShortString durablePriorityQueueName = new AMQShortString("MST-PriorityQueue-Durable");
+ AMQShortString durableQueueName = new AMQShortString("MST-Queue-Durable");
+ AMQShortString priorityQueueName = new AMQShortString("MST-PriorityQueue");
+ AMQShortString queueName = new AMQShortString("MST-Queue");
+
+ AMQShortString directRouting = new AMQShortString("MST-direct");
+ AMQShortString topicRouting = new AMQShortString("MST-topic");
+
+ protected void runTestWithStore(Configuration configuration)
+ {
+ //Ensure Environment Path is empty
+ cleanup(configuration);
+
+ //Load the Virtualhost with the required MessageStore
+ reload(configuration);
+
+ MessageStore messageStore = _virtualHost.getMessageStore();
+
+ createAllQueues();
+ createAllTopicQueues();
+
+ //Register Non-Durable DirectExchange
+ Exchange nonDurableExchange = createExchange(DirectExchange.TYPE, nonDurableExchangeName, false);
+ bindAllQueuesToExchange(nonDurableExchange, directRouting);
+
+ //Register DirectExchange
+ Exchange directExchange = createExchange(DirectExchange.TYPE, directExchangeName, true);
+ bindAllQueuesToExchange(directExchange, directRouting);
+
+ //Register TopicExchange
+ Exchange topicExchange = createExchange(TopicExchange.TYPE, topicExchangeName, true);
+ bindAllTopicQueuesToExchange(topicExchange, topicRouting);
+
+ //Send Message To NonDurable direct Exchange = persistent
+ sendMessageOnExchange(nonDurableExchange, directRouting, true);
+ // and non-persistent
+ sendMessageOnExchange(nonDurableExchange, directRouting, false);
+
+ //Send Message To direct Exchange = persistent
+ sendMessageOnExchange(directExchange, directRouting, true);
+ // and non-persistent
+ sendMessageOnExchange(directExchange, directRouting, false);
+
+ //Send Message To topic Exchange = persistent
+ sendMessageOnExchange(topicExchange, topicRouting, true);
+ // and non-persistent
+ sendMessageOnExchange(topicExchange, topicRouting, false);
+
+ //Ensure all the Queues have four messages (one transient, one persistent) x 2 exchange routings
+ validateMessageOnQueues(4, true);
+ //Ensure all the topics have two messages (one transient, one persistent)
+ validateMessageOnTopics(2, true);
+
+ assertEquals("Not all queues correctly registered", 8, _virtualHost.getQueueRegistry().getQueues().size());
+
+ if (!messageStore.isPersistent())
+ {
+ _logger.warn("Unable to test Persistent capabilities of messages store(" + messageStore.getClass() + ") as it is not capable of peristence.");
+ return;
+ }
+
+ //Reload the Virtualhost to test persistence
+ _logger.info("Reloading Virtualhost");
+
+ VirtualHost original = _virtualHost;
+
+ reload(configuration);
+
+ assertTrue("Virtualhost has not been reloaded", original != _virtualHost);
+
+ validateExchanges();
+
+ //Validate Durable Queues still have the persistentn message
+ validateMessageOnQueues(2, false);
+ //Validate Durable Queues still have the persistentn message
+ validateMessageOnTopics(1, false);
+
+ //Validate Properties of Binding
+ validateBindingProperties();
+
+ //Validate Properties of Queues
+ validateQueueProperties();
+
+ //Validate Non-Durable Queues are gone.
+ assertNull("Non-Durable queue still registered:" + priorityQueueName, _virtualHost.getQueueRegistry().getQueue(priorityQueueName));
+ assertNull("Non-Durable queue still registered:" + queueName, _virtualHost.getQueueRegistry().getQueue(queueName));
+ assertNull("Non-Durable queue still registered:" + priorityTopicQueueName, _virtualHost.getQueueRegistry().getQueue(priorityTopicQueueName));
+ assertNull("Non-Durable queue still registered:" + topicQueueName, _virtualHost.getQueueRegistry().getQueue(topicQueueName));
+
+ assertEquals("Not all queues correctly registered", 4, _virtualHost.getQueueRegistry().getQueues().size());
+ }
+
+ private void validateExchanges()
+ {
+ ExchangeRegistry registry = _virtualHost.getExchangeRegistry();
+
+ assertTrue(directExchangeName + " exchange NOT reloaded after failover",
+ registry.getExchangeNames().contains(directExchangeName));
+ assertTrue(topicExchangeName + " exchange NOT reloaded after failover",
+ registry.getExchangeNames().contains(topicExchangeName));
+ assertTrue(nonDurableExchangeName + " exchange reloaded after failover",
+ !registry.getExchangeNames().contains(nonDurableExchangeName));
+
+ // There are 5 required exchanges + our 2 durable queues
+ assertEquals("Incorrect number of exchanges available", 5 + 2, registry.getExchangeNames().size());
+ }
+
+ /** Validates that the Durable queues */
+ private void validateBindingProperties()
+ {
+ QueueRegistry queueRegistry = _virtualHost.getQueueRegistry();
+
+ validateBindingProperties(queueRegistry.getQueue(durablePriorityQueueName).getExchangeBindings(), false);
+ validateBindingProperties(queueRegistry.getQueue(durablePriorityTopicQueueName).getExchangeBindings(), true);
+ validateBindingProperties(queueRegistry.getQueue(durableQueueName).getExchangeBindings(), false);
+ validateBindingProperties(queueRegistry.getQueue(durableTopicQueueName).getExchangeBindings(), true);
+ }
+
+ /**
+ * Validate that each queue is bound once.
+ *
+ * @param bindings the set of bindings to validate
+ * @param useSelectors if set validate that the binding has a JMS_SELECTOR argument
+ */
+ private void validateBindingProperties(List<ExchangeBinding> bindings, boolean useSelectors)
+ {
+ assertEquals("Each queue should only be bound once.", 1, bindings.size());
+
+ ExchangeBinding binding = bindings.get(0);
+
+ if (useSelectors)
+ {
+ assertTrue("Binding does not contain a Selector argument.",
+ binding.getArguments().containsKey(AMQPFilterTypes.JMS_SELECTOR.getValue()));
+ }
+ }
+
+ private void validateQueueProperties()
+ {
+ QueueRegistry queueRegistry = _virtualHost.getQueueRegistry();
+
+ validateQueueProperties(queueRegistry.getQueue(durablePriorityQueueName), true);
+ validateQueueProperties(queueRegistry.getQueue(durablePriorityTopicQueueName), true);
+ validateQueueProperties(queueRegistry.getQueue(durableQueueName), false);
+ validateQueueProperties(queueRegistry.getQueue(durableTopicQueueName), false);
+
+ }
+
+ private void validateQueueProperties(AMQQueue queue, boolean usePriority)
+ {
+ if (usePriority)
+ {
+ assertEquals("Queue is no longer a Priority Queue", AMQPriorityQueue.class, queue.getClass());
+ assertEquals("Priority Queue does not have set priorities", DEFAULT_PRIORTY_LEVEL, ((AMQPriorityQueue) queue).getPriorities());
+ }
+ else
+ {
+ assertEquals("Queue is no longer a Priority Queue", SimpleAMQQueue.class, queue.getClass());
+ }
+ }
+
+ /**
+ * Delete the Store Environment path
+ *
+ * @param configuration The configuration that contains the store environment path.
+ */
+ private void cleanup(Configuration configuration)
+ {
+
+ String environment = configuration.getString("store.environment-path");
+
+ if (environment != null)
+ {
+ File environmentPath = new File(environment);
+
+ if (environmentPath.exists())
+ {
+ deleteDirectory(environmentPath);
+ }
+ }
+ }
+
+ private void deleteDirectory(File path)
+ {
+ if (path.isDirectory())
+ {
+ for (File file : path.listFiles())
+ {
+ deleteDirectory(file);
+ }
+ }
+ else
+ {
+ path.delete();
+ }
+ }
+
+ private void sendMessageOnExchange(Exchange directExchange, AMQShortString routingKey, boolean deliveryMode)
+ {
+ //Set MessagePersustebce
+ BasicContentHeaderProperties properties = new BasicContentHeaderProperties();
+ properties.setDeliveryMode(deliveryMode ? Integer.valueOf(2).byteValue() : Integer.valueOf(1).byteValue());
+ FieldTable headers = properties.getHeaders();
+ headers.setString("Test", "MST");
+ properties.setHeaders(headers);
+
+ MessagePublishInfo messageInfo = new TestMessagePublishInfo(directExchange, false, false, routingKey);
+
+ IncomingMessage currentMessage = null;
+
+ try
+ {
+ currentMessage = new IncomingMessage(_virtualHost.getMessageStore().getNewMessageId(),
+ messageInfo,
+ new NonTransactionalContext(_virtualHost.getMessageStore(),
+ new StoreContext(), null, null),
+ new InternalTestProtocolSession());
+ }
+ catch (AMQException e)
+ {
+ fail(e.getMessage());
+ }
+
+ currentMessage.setMessageStore(_virtualHost.getMessageStore());
+ currentMessage.setExchange(directExchange);
+
+ ContentHeaderBody headerBody = new ContentHeaderBody();
+ headerBody.classId = BasicConsumeBodyImpl.CLASS_ID;
+ headerBody.bodySize = 0;
+
+ headerBody.properties = properties;
+
+ try
+ {
+ currentMessage.setContentHeaderBody(headerBody);
+ }
+ catch (AMQException e)
+ {
+ fail(e.getMessage());
+ }
+
+ currentMessage.setExpiration();
+
+ try
+ {
+ currentMessage.route();
+ }
+ catch (AMQException e)
+ {
+ fail(e.getMessage());
+ }
+
+ try
+ {
+ currentMessage.routingComplete(_virtualHost.getMessageStore(), new MessageHandleFactory());
+ }
+ catch (AMQException e)
+ {
+ fail(e.getMessage());
+ }
+
+ // check and deliver if header says body length is zero
+ if (currentMessage.allContentReceived())
+ {
+ try
+ {
+ currentMessage.deliverToQueues();
+ }
+ catch (AMQException e)
+ {
+ fail(e.getMessage());
+ }
+ }
+ }
+
+ private void createAllQueues()
+ {
+ //Register Durable Priority Queue
+ createQueue(durablePriorityQueueName, true, true);
+
+ //Register Durable Simple Queue
+ createQueue(durableQueueName, false, true);
+
+ //Register NON-Durable Priority Queue
+ createQueue(priorityQueueName, true, false);
+
+ //Register NON-Durable Simple Queue
+ createQueue(queueName, false, false);
+ }
+
+ private void createAllTopicQueues()
+ {
+ //Register Durable Priority Queue
+ createQueue(durablePriorityTopicQueueName, true, true);
+
+ //Register Durable Simple Queue
+ createQueue(durableTopicQueueName, false, true);
+
+ //Register NON-Durable Priority Queue
+ createQueue(priorityTopicQueueName, true, false);
+
+ //Register NON-Durable Simple Queue
+ createQueue(topicQueueName, false, false);
+ }
+
+ private Exchange createExchange(ExchangeType type, AMQShortString name, boolean durable)
+ {
+ Exchange exchange = null;
+
+ try
+ {
+ exchange = type.newInstance(_virtualHost, name, durable, 0, false);
+ }
+ catch (AMQException e)
+ {
+ fail(e.getMessage());
+ }
+
+ try
+ {
+ _virtualHost.getExchangeRegistry().registerExchange(exchange);
+ }
+ catch (AMQException e)
+ {
+ fail(e.getMessage());
+ }
+ return exchange;
+ }
+
+ private void createQueue(AMQShortString queueName, boolean usePriority, boolean durable)
+ {
+
+ FieldTable queueArguments = null;
+
+ if (usePriority)
+ {
+ queueArguments = new FieldTable();
+ queueArguments.put(AMQQueueFactory.X_QPID_PRIORITIES, DEFAULT_PRIORTY_LEVEL);
+ }
+
+ AMQQueue queue = null;
+
+ //Ideally we would be able to use the QueueDeclareHandler here.
+ try
+ {
+ queue = AMQQueueFactory.createAMQQueueImpl(queueName, durable, queueOwner, false, _virtualHost,
+ queueArguments);
+
+ validateQueueProperties(queue, usePriority);
+
+ if (queue.isDurable() && !queue.isAutoDelete())
+ {
+ _virtualHost.getMessageStore().createQueue(queue, queueArguments);
+ }
+ }
+ catch (AMQException e)
+ {
+ fail(e.getMessage());
+ }
+
+ try
+ {
+ _virtualHost.getQueueRegistry().registerQueue(queue);
+ }
+ catch (AMQException e)
+ {
+ fail(e.getMessage());
+ }
+
+ }
+
+ private void bindAllQueuesToExchange(Exchange exchange, AMQShortString routingKey)
+ {
+ FieldTable queueArguments = new FieldTable();
+ queueArguments.put(AMQQueueFactory.X_QPID_PRIORITIES, DEFAULT_PRIORTY_LEVEL);
+
+ QueueRegistry queueRegistry = _virtualHost.getQueueRegistry();
+
+ bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(durablePriorityQueueName), false, queueArguments);
+ bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(durableQueueName), false, null);
+ bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(priorityQueueName), false, queueArguments);
+ bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(queueName), false, null);
+ }
+
+ private void bindAllTopicQueuesToExchange(Exchange exchange, AMQShortString routingKey)
+ {
+ FieldTable queueArguments = new FieldTable();
+ queueArguments.put(AMQQueueFactory.X_QPID_PRIORITIES, DEFAULT_PRIORTY_LEVEL);
+
+ QueueRegistry queueRegistry = _virtualHost.getQueueRegistry();
+
+ bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(durablePriorityTopicQueueName), true, queueArguments);
+ bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(durableTopicQueueName), true, null);
+ bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(priorityTopicQueueName), true, queueArguments);
+ bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(topicQueueName), true, null);
+ }
+
+
+ protected void bindQueueToExchange(Exchange exchange, AMQShortString routingKey, AMQQueue queue, boolean useSelector, FieldTable queueArguments)
+ {
+ try
+ {
+ exchange.registerQueue(queueName, queue, queueArguments);
+ }
+ catch (AMQException e)
+ {
+ fail(e.getMessage());
+ }
+
+ FieldTable bindArguments = null;
+
+ if (useSelector)
+ {
+ bindArguments = new FieldTable();
+ bindArguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), "Test = 'MST'");
+ }
+
+ try
+ {
+ queue.bind(exchange, routingKey, bindArguments);
+ }
+ catch (AMQException e)
+ {
+ fail(e.getMessage());
+ }
+ }
+
+ private void validateMessage(long messageCount, boolean allQueues)
+ {
+ validateMessageOnTopics(messageCount, allQueues);
+ validateMessageOnQueues(messageCount, allQueues);
+ }
+
+ private void validateMessageOnTopics(long messageCount, boolean allQueues)
+ {
+ validateMessageOnQueue(durablePriorityTopicQueueName, messageCount);
+ validateMessageOnQueue(durableTopicQueueName, messageCount);
+
+ if (allQueues)
+ {
+ validateMessageOnQueue(priorityTopicQueueName, messageCount);
+ validateMessageOnQueue(topicQueueName, messageCount);
+ }
+ }
+
+ private void validateMessageOnQueues(long messageCount, boolean allQueues)
+ {
+ validateMessageOnQueue(durablePriorityQueueName, messageCount);
+ validateMessageOnQueue(durableQueueName, messageCount);
+
+ if (allQueues)
+ {
+ validateMessageOnQueue(priorityQueueName, messageCount);
+ validateMessageOnQueue(queueName, messageCount);
+ }
+ }
+
+ private void validateMessageOnQueue(AMQShortString queueName, long messageCount)
+ {
+ AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(queueName);
+
+ assertNotNull("Queue(" + queueName + ") not correctly registered:", queue);
+
+ assertEquals("Incorrect Message count on queue:" + queueName, messageCount, queue.getMessageCount());
+ }
+
+ private class TestMessagePublishInfo implements MessagePublishInfo
+ {
+
+ Exchange _exchange;
+ boolean _immediate;
+ boolean _mandatory;
+ AMQShortString _routingKey;
+
+ TestMessagePublishInfo(Exchange exchange, boolean immediate, boolean mandatory, AMQShortString routingKey)
+ {
+ _exchange = exchange;
+ _immediate = immediate;
+ _mandatory = mandatory;
+ _routingKey = routingKey;
+ }
+
+ public AMQShortString getExchange()
+ {
+ return _exchange.getName();
+ }
+
+ public void setExchange(AMQShortString exchange)
+ {
+ //no-op
+ }
+
+ public boolean isImmediate()
+ {
+ return _immediate;
+ }
+
+ public boolean isMandatory()
+ {
+ return _mandatory;
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return _routingKey;
+ }
+ }
+} \ No newline at end of file
diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java b/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java
index 792744903e..e2be503926 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java
@@ -78,7 +78,7 @@ public class SkeletonMessageStore implements MessageStore
//To change body of implemented methods use File | Settings | File Templates.
}
- public void createQueue(AMQQueue queue) throws AMQException
+ public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException
{
}
@@ -129,6 +129,11 @@ public class SkeletonMessageStore implements MessageStore
return null;
}
+ public boolean isPersistent()
+ {
+ return false;
+ }
+
public void removeQueue(final AMQQueue queue) throws AMQException
{