summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2010-08-06 14:27:04 +0000
committerRobert Gemmell <robbie@apache.org>2010-08-06 14:27:04 +0000
commitc41701a0e329e9782772c5e704d19de9afaaaa7c (patch)
tree4627244893a2b1206cdce1d11bef25cb4c89c38f
parenta0931c1851659f80de2a94719af8ad573dcf0b73 (diff)
downloadqpid-python-c41701a0e329e9782772c5e704d19de9afaaaa7c.tar.gz
QPID-2787: split existing test into component test methods regarding persistence/recovery of queues, exchanges, messages,bindings. Add additional tests around removal of persisted items.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@982988 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java577
-rwxr-xr-xqpid/java/test-profiles/CPPExcludes10
-rw-r--r--qpid/java/test-profiles/JavaTransientExcludes10
3 files changed, 425 insertions, 172 deletions
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
index d6a8e8e15d..673928e619 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
@@ -20,12 +20,12 @@
*/
package org.apache.qpid.server.store;
-import org.apache.commons.configuration.Configuration;
-import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.qpid.server.util.InternalBrokerBaseCase;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.qpid.AMQException;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.AMQShortString;
@@ -53,102 +53,96 @@ import org.apache.qpid.server.queue.SimpleAMQQueue;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.InternalBrokerBaseCase;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.util.FileUtils;
-import java.io.File;
-import java.util.List;
-
/**
* This tests the MessageStores by using the available interfaces.
*
- * This test validates that Exchanges, Queues, Bindings and Messages are persisted and
- * recovered correctly.
+ * For persistent stores, it validates that Exchanges, Queues, Bindings and
+ * Messages are persisted and recovered correctly.
*/
public class MessageStoreTest extends InternalBrokerBaseCase
{
+ public static final int DEFAULT_PRIORTY_LEVEL = 5;
+ public static final String SELECTOR_VALUE = "Test = 'MST'";
- private static final int DEFAULT_PRIORTY_LEVEL = 5;
- private static final Logger _logger = LoggerFactory.getLogger(MessageStoreTest.class);
+ AMQShortString nonDurableExchangeName = new AMQShortString("MST-NonDurableDirectExchange");
+ AMQShortString directExchangeName = new AMQShortString("MST-DirectExchange");
+ AMQShortString topicExchangeName = new AMQShortString("MST-TopicExchange");
- public void testMemoryMessageStore() throws Exception
- {
+ AMQShortString durablePriorityTopicQueueName = new AMQShortString("MST-PriorityTopicQueue-Durable");
+ AMQShortString durableTopicQueueName = new AMQShortString("MST-TopicQueue-Durable");
+ AMQShortString priorityTopicQueueName = new AMQShortString("MST-PriorityTopicQueue");
+ AMQShortString topicQueueName = new AMQShortString("MST-TopicQueue");
+
+ AMQShortString durableExclusiveQueueName = new AMQShortString("MST-Queue-Durable-Exclusive");
+ AMQShortString durablePriorityQueueName = new AMQShortString("MST-PriorityQueue-Durable");
+ AMQShortString durableQueueName = new AMQShortString("MST-Queue-Durable");
+ AMQShortString priorityQueueName = new AMQShortString("MST-PriorityQueue");
+ AMQShortString queueName = new AMQShortString("MST-Queue");
- PropertiesConfiguration config = new PropertiesConfiguration();
+ AMQShortString directRouting = new AMQShortString("MST-direct");
+ AMQShortString topicRouting = new AMQShortString("MST-topic");
- config.addProperty("store.class", "org.apache.qpid.server.store.MemoryMessageStore");
+ AMQShortString queueOwner = new AMQShortString("MST");
- runTestWithStore(config);
- }
+ protected PropertiesConfiguration _config;
- public void testDerbyMessageStore() throws Exception
+ public void setUp() throws Exception
{
- PropertiesConfiguration config = new PropertiesConfiguration();
+ super.setUp();
+
+ String storePath = System.getProperty("QPID_WORK") + "/" + getName();
+
+ _config = new PropertiesConfiguration();
+ _config.addProperty("store.class", getTestProfileMessageStoreClassName());
+ _config.addProperty("store.environment-path", storePath);
- config.addProperty("store.environment-path",
- System.getProperty("QPID_WORK") + "/derbyDB_MessageStoreTest");
- config.addProperty("store.class", "org.apache.qpid.server.store.DerbyMessageStore");
+ cleanup(new File(storePath));
- runTestWithStore(config);
+ reloadVirtualHost();
}
- private void reload(Configuration configuration)
+ protected void reloadVirtualHost()
{
+ VirtualHost original = _virtualHost;
+
if (_virtualHost != null)
{
try
{
_virtualHost.close();
_virtualHost.getApplicationRegistry().
- getVirtualHostRegistry().unregisterVirtualHost(_virtualHost);
+ getVirtualHostRegistry().unregisterVirtualHost(_virtualHost);
}
catch (Exception e)
{
fail(e.getMessage());
-
}
}
try
{
- _virtualHost = ApplicationRegistry.getInstance().createVirtualHost(new VirtualHostConfiguration(getClass().getName(), configuration));
+ _virtualHost = ApplicationRegistry.getInstance().createVirtualHost(new VirtualHostConfiguration(getClass().getName(), _config));
}
catch (Exception e)
{
e.printStackTrace();
fail(e.getMessage());
}
- }
- AMQShortString nonDurableExchangeName = new AMQShortString("MST-NonDurableDirectExchange");
- AMQShortString directExchangeName = new AMQShortString("MST-DirectExchange");
- AMQShortString topicExchangeName = new AMQShortString("MST-TopicExchange");
- AMQShortString queueOwner = new AMQShortString("MST");
-
- AMQShortString durablePriorityTopicQueueName = new AMQShortString("MST-PriorityTopicQueue-Durable");
- AMQShortString durableTopicQueueName = new AMQShortString("MST-TopicQueue-Durable");
- AMQShortString priorityTopicQueueName = new AMQShortString("MST-PriorityTopicQueue");
- AMQShortString topicQueueName = new AMQShortString("MST-TopicQueue");
-
- AMQShortString durableExclusiveQueueName = new AMQShortString("MST-Queue-Durable-Exclusive");
- AMQShortString durablePriorityQueueName = new AMQShortString("MST-PriorityQueue-Durable");
- AMQShortString durableQueueName = new AMQShortString("MST-Queue-Durable");
- AMQShortString priorityQueueName = new AMQShortString("MST-PriorityQueue");
- AMQShortString queueName = new AMQShortString("MST-Queue");
-
- AMQShortString directRouting = new AMQShortString("MST-direct");
- AMQShortString topicRouting = new AMQShortString("MST-topic");
+ assertTrue("Virtualhost has not changed, reload was not successful", original != _virtualHost);
+ }
-
- protected void runTestWithStore(Configuration configuration) throws AMQException
+ /**
+ * Old MessageStoreTest segment which runs against both persistent and non-persistent stores
+ * creating queues, exchanges and bindings and then verifying message delivery to them.
+ */
+ public void testQueueExchangeAndBindingCreation() throws Exception
{
- //Ensure Environment Path is empty
- cleanup(configuration);
-
- //Load the Virtualhost with the required MessageStore
- reload(configuration);
-
- MessageStore messageStore = _virtualHost.getMessageStore();
+ assertEquals("Should not be any existing queues", 0, _virtualHost.getQueueRegistry().getQueues().size());
createAllQueues();
createAllTopicQueues();
@@ -185,90 +179,303 @@ public class MessageStoreTest extends InternalBrokerBaseCase
//Ensure all the topics have two messages (one transient, one persistent)
validateMessageOnTopics(2, true);
- assertEquals("Not all queues correctly registered", 9, _virtualHost.getQueueRegistry().getQueues().size());
+ assertEquals("Not all queues correctly registered",
+ 9, _virtualHost.getQueueRegistry().getQueues().size());
+ }
- if (!messageStore.isPersistent())
- {
- _logger.warn("Unable to test Persistent capabilities of messages store(" + messageStore.getClass() + ") as it is not capable of peristence.");
- return;
- }
+ /**
+ * Tests message persistence by running the testQueueExchangeAndBindingCreation() method above
+ * before reloading the virtual host and ensuring that the persistent messages were restored.
+ *
+ * More specific testing of message persistence is left to store-specific unit testing.
+ */
+ public void testMessagePersistence() throws Exception
+ {
+ testQueueExchangeAndBindingCreation();
- //Reload the Virtualhost to test persistence
- _logger.info("Reloading Virtualhost");
+ reloadVirtualHost();
- VirtualHost original = _virtualHost;
+ //Validate durable queues and subscriptions still have the persistent messages
+ validateMessageOnQueues(2, false);
+ validateMessageOnTopics(1, false);
+ }
+
+ /**
+ * Tests message removal by running the testMessagePersistence() method above before
+ * clearing the queues, reloading the virtual host, and ensuring that the persistent
+ * messages were removed from the queues.
+ */
+ public void testMessageRemoval() throws Exception
+ {
+ testMessagePersistence();
- reload(configuration);
+ QueueRegistry queueRegistry = _virtualHost.getQueueRegistry();
- assertTrue("Virtualhost has not been reloaded", original != _virtualHost);
+ assertEquals("Incorrect number of queues registered after recovery",
+ 5, queueRegistry.getQueues().size());
- validateExchanges();
+ //clear the queue
+ queueRegistry.getQueue(durableQueueName).clearQueue();
+
+ //check the messages are gone
+ validateMessageOnQueue(durableQueueName, 0);
+
+ //reload and verify messages arent restored
+ reloadVirtualHost();
- //Validate Durable Queues still have the persistent message
- validateMessageOnQueues(2, false);
- //Validate Durable Queues still have the persistent message
- validateMessageOnTopics(1, false);
+ validateMessageOnQueue(durableQueueName, 0);
+ }
- //Validate Properties of Binding
- validateBindingProperties();
+ /**
+ * Tests queue persistence by creating a selection of queues with differing properties, both
+ * durable and non durable, and ensuring that following the recovery process the correct queues
+ * are present and any property manipulations (eg queue exclusivity) are correctly recovered.
+ */
+ public void testQueuePersistence() throws Exception
+ {
+ assertEquals("Should not be any existing queues",
+ 0, _virtualHost.getQueueRegistry().getQueues().size());
- //Validate Properties of Queues
- validateQueueProperties();
- validateQueueExclusivityProperty(true);
+ //create durable and non durable queues/topics
+ createAllQueues();
+ createAllTopicQueues();
- //Validate Non-Durable Queues are gone.
- assertNull("Non-Durable queue still registered:" + priorityQueueName, _virtualHost.getQueueRegistry().getQueue(priorityQueueName));
- assertNull("Non-Durable queue still registered:" + queueName, _virtualHost.getQueueRegistry().getQueue(queueName));
- assertNull("Non-Durable queue still registered:" + priorityTopicQueueName, _virtualHost.getQueueRegistry().getQueue(priorityTopicQueueName));
- assertNull("Non-Durable queue still registered:" + topicQueueName, _virtualHost.getQueueRegistry().getQueue(topicQueueName));
+ //reload the virtual host, prompting recovery of the queues/topics
+ reloadVirtualHost();
- assertEquals("Not all queues correctly registered", 5, _virtualHost.getQueueRegistry().getQueues().size());
-
- //Try updating the queue exclusivity and verify it is persisted and recovered correctly
+ QueueRegistry queueRegistry = _virtualHost.getQueueRegistry();
+
+ assertEquals("Incorrect number of queues registered after recovery",
+ 5, queueRegistry.getQueues().size());
+
+ //Validate the non-Durable Queues were not recovered.
+ assertNull("Non-Durable queue still registered:" + priorityQueueName,
+ queueRegistry.getQueue(priorityQueueName));
+ assertNull("Non-Durable queue still registered:" + queueName,
+ queueRegistry.getQueue(queueName));
+ assertNull("Non-Durable queue still registered:" + priorityTopicQueueName,
+ queueRegistry.getQueue(priorityTopicQueueName));
+ assertNull("Non-Durable queue still registered:" + topicQueueName,
+ queueRegistry.getQueue(topicQueueName));
+
+ //Validate normally expected properties of Queues/Topics
+ validateDurableQueueProperties();
+
+ //Update the durable exclusive queue's exclusivity and verify it is persisted and recovered correctly
setQueueExclusivity(false);
+ validateQueueExclusivityProperty(false);
+
+ //Reload the Virtualhost to recover the queues again
+ reloadVirtualHost();
+
+ //verify the change was persisted and recovered correctly
+ validateQueueExclusivityProperty(false);
+ }
+
+ /**
+ * Tests queue removal by creating a durable queue, verifying it recovers, and
+ * then removing it from the store, and ensuring that following the second reload
+ * process it is not recovered.
+ */
+ public void testDurableQueueRemoval() throws Exception
+ {
+ //Register Durable Queue
+ createQueue(durableQueueName, false, true, false);
+
+ QueueRegistry queueRegistry = _virtualHost.getQueueRegistry();
+ assertEquals("Incorrect number of queues registered before recovery",
+ 1, queueRegistry.getQueues().size());
+
+ reloadVirtualHost();
- //Reload the Virtualhost to test update to queue exclusivity
- _logger.info("Reloading Virtualhost");
- original = _virtualHost;
- reload(configuration);
+ queueRegistry = _virtualHost.getQueueRegistry();
+ assertEquals("Incorrect number of queues registered after first recovery",
+ 1, queueRegistry.getQueues().size());
- assertTrue("Virtualhost has not been reloaded", original != _virtualHost);
+ //test that removing the queue means it is not recovered next time
+ _virtualHost.getDurableConfigurationStore().removeQueue(queueRegistry.getQueue(durableQueueName));
- //verify the change was persisted
- validateQueueExclusivityProperty(false);
+ reloadVirtualHost();
+
+ queueRegistry = _virtualHost.getQueueRegistry();
+ assertEquals("Incorrect number of queues registered after second recovery",
+ 0, queueRegistry.getQueues().size());
+ assertNull("Durable queue was not removed:" + durableQueueName,
+ queueRegistry.getQueue(durableQueueName));
}
- private void validateExchanges()
+ /**
+ * Tests exchange persistence by creating a selection of exchanges, both durable
+ * and non durable, and ensuring that following the recovery process the correct
+ * durable exchanges are still present.
+ */
+ public void testExchangePersistence() throws Exception
{
- ExchangeRegistry registry = _virtualHost.getExchangeRegistry();
+ int origExchangeCount = _virtualHost.getExchangeRegistry().getExchangeNames().size();
+
+ Map<AMQShortString, Exchange> oldExchanges = createExchanges();
+
+ assertEquals("Incorrect number of exchanges registered before recovery",
+ origExchangeCount + 3, _virtualHost.getExchangeRegistry().getExchangeNames().size());
+
+ reloadVirtualHost();
+
+ //verify the exchanges present after recovery
+ validateExchanges(origExchangeCount, oldExchanges);
+ }
+
+ /**
+ * Tests exchange removal by creating a durable exchange, verifying it recovers, and
+ * then removing it from the store, and ensuring that following the second reload
+ * process it is not recovered.
+ */
+ public void testDurableExchangeRemoval() throws Exception
+ {
+ int origExchangeCount = _virtualHost.getExchangeRegistry().getExchangeNames().size();
+
+ createExchange(DirectExchange.TYPE, directExchangeName, true);
+
+ ExchangeRegistry exchangeRegistry = _virtualHost.getExchangeRegistry();
+ assertEquals("Incorrect number of exchanges registered before recovery",
+ origExchangeCount + 1, exchangeRegistry.getExchangeNames().size());
+
+ reloadVirtualHost();
+
+ exchangeRegistry = _virtualHost.getExchangeRegistry();
+ assertEquals("Incorrect number of exchanges registered after first recovery",
+ origExchangeCount + 1, exchangeRegistry.getExchangeNames().size());
+
+ //test that removing the exchange means it is not recovered next time
+ _virtualHost.getDurableConfigurationStore().removeExchange(exchangeRegistry.getExchange(directExchangeName));
+
+ reloadVirtualHost();
+
+ exchangeRegistry = _virtualHost.getExchangeRegistry();
+ assertEquals("Incorrect number of exchanges registered after second recovery",
+ origExchangeCount, exchangeRegistry.getExchangeNames().size());
+ assertNull("Durable exchange was not removed:" + directExchangeName,
+ exchangeRegistry.getExchange(directExchangeName));
+ }
+
+ /**
+ * Tests binding persistence by creating a selection of queues and exchanges, both durable
+ * and non durable, then adding bindings with and without selectors before reloading the
+ * virtual host and verifying that following the recovery process the correct durable
+ * bindings (those for durable queues to durable exchanges) are still present.
+ */
+ public void testBindingPersistence() throws Exception
+ {
+ int origExchangeCount = _virtualHost.getExchangeRegistry().getExchangeNames().size();
+
+ createAllQueues();
+ createAllTopicQueues();
+
+ Map<AMQShortString, Exchange> exchanges = createExchanges();
+
+ Exchange nonDurableExchange = exchanges.get(nonDurableExchangeName);
+ Exchange directExchange = exchanges.get(directExchangeName);
+ Exchange topicExchange = exchanges.get(topicExchangeName);
+
+ bindAllQueuesToExchange(nonDurableExchange, directRouting);
+ bindAllQueuesToExchange(directExchange, directRouting);
+ bindAllTopicQueuesToExchange(topicExchange, topicRouting);
+
+ assertEquals("Incorrect number of exchanges registered before recovery",
+ origExchangeCount + 3, _virtualHost.getExchangeRegistry().getExchangeNames().size());
+
+ reloadVirtualHost();
+
+ validateExchanges(origExchangeCount, exchanges);
+
+ validateBindingProperties();
+ }
+
+ /**
+ * Tests binding removal by creating a durable exchange, and queue, binding them together,
+ * recovering to verify the persistence, then removing it from the store, and ensuring
+ * that following the second reload process it is not recovered.
+ */
+ public void testDurableBindingRemoval() throws Exception
+ {
+ QueueRegistry queueRegistry = _virtualHost.getQueueRegistry();
+
+ //create durable queue and exchange, bind them
+ Exchange exch = createExchange(DirectExchange.TYPE, directExchangeName, true);
+ createQueue(durableQueueName, false, true, false);
+ bindQueueToExchange(exch, directRouting, queueRegistry.getQueue(durableQueueName), false, null);
+
+ assertEquals("Incorrect number of bindings registered before recovery",
+ 1, queueRegistry.getQueue(durableQueueName).getBindings().size());
+
+ //verify binding is actually normally recovered
+ reloadVirtualHost();
+
+ queueRegistry = _virtualHost.getQueueRegistry();
+ assertEquals("Incorrect number of bindings registered after first recovery",
+ 1, queueRegistry.getQueue(durableQueueName).getBindings().size());
+
+ ExchangeRegistry exchangeRegistry = _virtualHost.getExchangeRegistry();
+ exch = exchangeRegistry.getExchange(directExchangeName);
+ assertNotNull("Exchange was not recovered", exch);
- assertTrue(directExchangeName + " exchange NOT reloaded after failover",
- registry.getExchangeNames().contains(directExchangeName));
- assertTrue(topicExchangeName + " exchange NOT reloaded after failover",
- registry.getExchangeNames().contains(topicExchangeName));
- assertTrue(nonDurableExchangeName + " exchange reloaded after failover",
- !registry.getExchangeNames().contains(nonDurableExchangeName));
+ //remove the binding and verify result after recovery
+ unbindQueueFromExchange(exch, directRouting, queueRegistry.getQueue(durableQueueName), false, null);
+
+ reloadVirtualHost();
+
+ queueRegistry = _virtualHost.getQueueRegistry();
+ assertEquals("Incorrect number of bindings registered after second recovery",
+ 0, queueRegistry.getQueue(durableQueueName).getBindings().size());
+ }
+
+ /**
+ * Validates that the durable exchanges are still present, the non durable exchange is not,
+ * and that the new exchanges are not the same objects as the provided list (i.e. that the
+ * reload actually generated new exchange objects)
+ */
+ private void validateExchanges(int originalNumExchanges, Map<AMQShortString, Exchange> oldExchanges)
+ {
+ ExchangeRegistry registry = _virtualHost.getExchangeRegistry();
- // There are 5 required exchanges + qpid.management + our 2 durable exchanges
- assertEquals("Incorrect number of exchanges available", 6 + 2, registry.getExchangeNames().size());
+ assertTrue(directExchangeName + " exchange NOT reloaded",
+ registry.getExchangeNames().contains(directExchangeName));
+ assertTrue(topicExchangeName + " exchange NOT reloaded",
+ registry.getExchangeNames().contains(topicExchangeName));
+ assertTrue(nonDurableExchangeName + " exchange reloaded",
+ !registry.getExchangeNames().contains(nonDurableExchangeName));
+
+ //check the old exchange objects are not the same as the new exchanges
+ assertTrue(directExchangeName + " exchange NOT reloaded",
+ registry.getExchange(directExchangeName) != oldExchanges.get(directExchangeName));
+ assertTrue(topicExchangeName + " exchange NOT reloaded",
+ registry.getExchange(topicExchangeName) != oldExchanges.get(topicExchangeName));
+
+ // There should only be the original exchanges + our 2 recovered durable exchanges
+ assertEquals("Incorrect number of exchanges available",
+ originalNumExchanges + 2, registry.getExchangeNames().size());
}
- /** Validates that the Durable queues */
+ /** Validates the Durable queues and their properties are as expected following recovery */
private void validateBindingProperties()
{
QueueRegistry queueRegistry = _virtualHost.getQueueRegistry();
+ assertEquals("There should be 5 (durable) queues following recovery", 5, queueRegistry.getQueues().size());
+
validateBindingProperties(queueRegistry.getQueue(durablePriorityQueueName).getBindings(), false);
validateBindingProperties(queueRegistry.getQueue(durablePriorityTopicQueueName).getBindings(), true);
validateBindingProperties(queueRegistry.getQueue(durableQueueName).getBindings(), false);
validateBindingProperties(queueRegistry.getQueue(durableTopicQueueName).getBindings(), true);
+ validateBindingProperties(queueRegistry.getQueue(durableExclusiveQueueName).getBindings(), false);
}
/**
- * Validate that each queue is bound once.
+ * Validate that each queue is bound only once following recovery (i.e. that bindings for non durable
+ * queues or to non durable exchanges are not recovered), and if a selector should be present
+ * that it is and contains the correct value
*
* @param bindings the set of bindings to validate
- * @param useSelectors if set validate that the binding has a JMS_SELECTOR argument
+ * @param useSelectors if set, check the binding has a JMS_SELECTOR argument and the correct value for it
*/
private void validateBindingProperties(List<Binding> bindings, boolean useSelectors)
{
@@ -279,16 +486,18 @@ public class MessageStoreTest extends InternalBrokerBaseCase
if (useSelectors)
{
assertTrue("Binding does not contain a Selector argument.",
- binding.getArguments().containsKey(AMQPFilterTypes.JMS_SELECTOR.getValue()));
+ binding.getArguments().containsKey(AMQPFilterTypes.JMS_SELECTOR.getValue()));
+ assertEquals("The binding selector argument is incorrect", SELECTOR_VALUE,
+ binding.getArguments().get(AMQPFilterTypes.JMS_SELECTOR.getValue()).toString());
}
}
-
+
private void setQueueExclusivity(boolean exclusive) throws AMQException
{
QueueRegistry queueRegistry = _virtualHost.getQueueRegistry();
AMQQueue queue = queueRegistry.getQueue(durableExclusiveQueueName);
-
+
queue.setExclusive(exclusive);
}
@@ -297,32 +506,38 @@ public class MessageStoreTest extends InternalBrokerBaseCase
QueueRegistry queueRegistry = _virtualHost.getQueueRegistry();
AMQQueue queue = queueRegistry.getQueue(durableExclusiveQueueName);
-
+
assertEquals("Queue exclusivity was incorrect", queue.isExclusive(), expected);
}
-
- private void validateQueueProperties()
+
+ private void validateDurableQueueProperties()
{
QueueRegistry queueRegistry = _virtualHost.getQueueRegistry();
- validateQueueProperties(queueRegistry.getQueue(durablePriorityQueueName), true);
- validateQueueProperties(queueRegistry.getQueue(durablePriorityTopicQueueName), true);
- validateQueueProperties(queueRegistry.getQueue(durableQueueName), false);
- validateQueueProperties(queueRegistry.getQueue(durableTopicQueueName), false);
+ validateQueueProperties(queueRegistry.getQueue(durablePriorityQueueName), true, true, false);
+ validateQueueProperties(queueRegistry.getQueue(durablePriorityTopicQueueName), true, true, false);
+ validateQueueProperties(queueRegistry.getQueue(durableQueueName), false, true, false);
+ validateQueueProperties(queueRegistry.getQueue(durableTopicQueueName), false, true, false);
+ validateQueueProperties(queueRegistry.getQueue(durableExclusiveQueueName), false, true, true);
}
- private void validateQueueProperties(AMQQueue queue, boolean usePriority)
+ private void validateQueueProperties(AMQQueue queue, boolean usePriority, boolean durable, boolean exclusive)
{
if (usePriority)
{
assertEquals("Queue is no longer a Priority Queue", AMQPriorityQueue.class, queue.getClass());
- assertEquals("Priority Queue does not have set priorities", DEFAULT_PRIORTY_LEVEL, ((AMQPriorityQueue) queue).getPriorities());
+ assertEquals("Priority Queue does not have set priorities",
+ DEFAULT_PRIORTY_LEVEL, ((AMQPriorityQueue) queue).getPriorities());
}
else
{
assertEquals("Queue is no longer a Priority Queue", SimpleAMQQueue.class, queue.getClass());
}
+
+ assertEquals("Queue owner is not as expected", queueOwner, queue.getOwner());
+ assertEquals("Queue durability is not as expected", durable, queue.isDurable());
+ assertEquals("Queue exclusivity is not as expected", exclusive, queue.isExclusive());
}
/**
@@ -330,39 +545,31 @@ public class MessageStoreTest extends InternalBrokerBaseCase
*
* @param configuration The configuration that contains the store environment path.
*/
- private void cleanup(Configuration configuration)
+ private void cleanup(File environmentPath)
{
-
- String environment = configuration.getString("store.environment-path");
-
- if (environment != null)
+ if (environmentPath.exists())
{
- File environmentPath = new File(environment);
-
- if (environmentPath.exists())
- {
- FileUtils.delete(environmentPath, true);
- }
+ FileUtils.delete(environmentPath, true);
}
}
- private void sendMessageOnExchange(Exchange directExchange, AMQShortString routingKey, boolean deliveryMode)
+ private void sendMessageOnExchange(Exchange exchange, AMQShortString routingKey, boolean deliveryMode)
{
- //Set MessagePersustebce
+ //Set MessagePersistence
BasicContentHeaderProperties properties = new BasicContentHeaderProperties();
properties.setDeliveryMode(deliveryMode ? Integer.valueOf(2).byteValue() : Integer.valueOf(1).byteValue());
FieldTable headers = properties.getHeaders();
headers.setString("Test", "MST");
properties.setHeaders(headers);
- MessagePublishInfo messageInfo = new TestMessagePublishInfo(directExchange, false, false, routingKey);
+ MessagePublishInfo messageInfo = new TestMessagePublishInfo(exchange, false, false, routingKey);
final IncomingMessage currentMessage;
currentMessage = new IncomingMessage(messageInfo);
- currentMessage.setExchange(directExchange);
+ currentMessage.setExchange(exchange);
ContentHeaderBody headerBody = new ContentHeaderBody();
headerBody.classId = BasicConsumeBodyImpl.CLASS_ID;
@@ -390,7 +597,6 @@ public class MessageStoreTest extends InternalBrokerBaseCase
// check and deliver if header says body length is zero
if (currentMessage.allContentReceived())
{
- // TODO Deliver to queues
ServerTransaction trans = new AutoCommitTransaction(_virtualHost.getMessageStore());
final List<? extends BaseQueue> destinationQueues = currentMessage.getDestinationQueues();
trans.enqueue(currentMessage.getDestinationQueues(), currentMessage, new ServerTransaction.Action() {
@@ -426,10 +632,10 @@ public class MessageStoreTest extends InternalBrokerBaseCase
//Register Durable Simple Queue
createQueue(durableQueueName, false, true, false);
-
+
//Register Durable Exclusive Simple Queue
createQueue(durableExclusiveQueueName, false, true, true);
-
+
//Register NON-Durable Priority Queue
createQueue(priorityQueueName, true, false, false);
@@ -452,34 +658,6 @@ public class MessageStoreTest extends InternalBrokerBaseCase
createQueue(topicQueueName, false, false, false);
}
- private Exchange createExchange(ExchangeType type, AMQShortString name, boolean durable)
- {
- Exchange exchange = null;
-
- try
- {
- exchange = type.newInstance(_virtualHost, name, durable, 0, false);
- }
- catch (AMQException e)
- {
- fail(e.getMessage());
- }
-
- try
- {
- _virtualHost.getExchangeRegistry().registerExchange(exchange);
- if (durable)
- {
- _virtualHost.getMessageStore().createExchange(exchange);
- }
- }
- catch (AMQException e)
- {
- fail(e.getMessage());
- }
- return exchange;
- }
-
private void createQueue(AMQShortString queueName, boolean usePriority, boolean durable, boolean exclusive)
{
@@ -497,9 +675,9 @@ public class MessageStoreTest extends InternalBrokerBaseCase
try
{
queue = AMQQueueFactory.createAMQQueueImpl(queueName, durable, queueOwner, false, exclusive,
- _virtualHost, queueArguments);
+ _virtualHost, queueArguments);
- validateQueueProperties(queue, usePriority);
+ validateQueueProperties(queue, usePriority, durable, exclusive);
if (queue.isDurable() && !queue.isAutoDelete())
{
@@ -515,6 +693,48 @@ public class MessageStoreTest extends InternalBrokerBaseCase
}
+ private Map<AMQShortString, Exchange> createExchanges()
+ {
+ Map<AMQShortString, Exchange> exchanges = new HashMap<AMQShortString, Exchange>();
+
+ //Register non-durable DirectExchange
+ exchanges.put(nonDurableExchangeName, createExchange(DirectExchange.TYPE, nonDurableExchangeName, false));
+
+ //Register durable DirectExchange and TopicExchange
+ exchanges.put(directExchangeName ,createExchange(DirectExchange.TYPE, directExchangeName, true));
+ exchanges.put(topicExchangeName,createExchange(TopicExchange.TYPE, topicExchangeName, true));
+
+ return exchanges;
+ }
+
+ private Exchange createExchange(ExchangeType<?> type, AMQShortString name, boolean durable)
+ {
+ Exchange exchange = null;
+
+ try
+ {
+ exchange = type.newInstance(_virtualHost, name, durable, 0, false);
+ }
+ catch (AMQException e)
+ {
+ fail(e.getMessage());
+ }
+
+ try
+ {
+ _virtualHost.getExchangeRegistry().registerExchange(exchange);
+ if (durable)
+ {
+ _virtualHost.getMessageStore().createExchange(exchange);
+ }
+ }
+ catch (AMQException e)
+ {
+ fail(e.getMessage());
+ }
+ return exchange;
+ }
+
private void bindAllQueuesToExchange(Exchange exchange, AMQShortString routingKey)
{
FieldTable queueArguments = new FieldTable();
@@ -526,6 +746,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase
bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(durableQueueName), false, null);
bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(priorityQueueName), false, queueArguments);
bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(queueName), false, null);
+ bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(durableExclusiveQueueName), false, null);
}
private void bindAllTopicQueuesToExchange(Exchange exchange, AMQShortString routingKey)
@@ -544,14 +765,12 @@ public class MessageStoreTest extends InternalBrokerBaseCase
protected void bindQueueToExchange(Exchange exchange, AMQShortString routingKey, AMQQueue queue, boolean useSelector, FieldTable queueArguments)
{
-
-
FieldTable bindArguments = null;
if (useSelector)
{
bindArguments = new FieldTable();
- bindArguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), "Test = 'MST'");
+ bindArguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), SELECTOR_VALUE );
}
try
@@ -563,11 +782,25 @@ public class MessageStoreTest extends InternalBrokerBaseCase
fail(e.getMessage());
}
}
-
- private void validateMessage(long messageCount, boolean allQueues)
+
+ protected void unbindQueueFromExchange(Exchange exchange, AMQShortString routingKey, AMQQueue queue, boolean useSelector, FieldTable queueArguments)
{
- validateMessageOnTopics(messageCount, allQueues);
- validateMessageOnQueues(messageCount, allQueues);
+ FieldTable bindArguments = null;
+
+ if (useSelector)
+ {
+ bindArguments = new FieldTable();
+ bindArguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), SELECTOR_VALUE );
+ }
+
+ try
+ {
+ _virtualHost.getBindingFactory().removeBinding(String.valueOf(routingKey), queue, exchange, FieldTable.convertToMap(bindArguments));
+ }
+ catch (Exception e)
+ {
+ fail(e.getMessage());
+ }
}
private void validateMessageOnTopics(long messageCount, boolean allQueues)
diff --git a/qpid/java/test-profiles/CPPExcludes b/qpid/java/test-profiles/CPPExcludes
index 8300f672ce..79e8e88c90 100755
--- a/qpid/java/test-profiles/CPPExcludes
+++ b/qpid/java/test-profiles/CPPExcludes
@@ -73,6 +73,16 @@ org.apache.qpid.server.AlertingTest#*
// The C++ server has a totally different persistence mechanism
org.apache.qpid.server.store.PersistentStoreTest#*
+// These tests are for the Java broker persistent store modules
+org.apache.qpid.server.store.MessageStoreTest#testMessagePersistence
+org.apache.qpid.server.store.MessageStoreTest#testMessageRemoval
+org.apache.qpid.server.store.MessageStoreTest#testBindingPersistence
+org.apache.qpid.server.store.MessageStoreTest#testDurableBindingRemoval
+org.apache.qpid.server.store.MessageStoreTest#testQueuePersistence
+org.apache.qpid.server.store.MessageStoreTest#testDurableQueueRemoval
+org.apache.qpid.server.store.MessageStoreTest#testExchangePersistence
+org.apache.qpid.server.store.MessageStoreTest#testDurableExchangeRemoval
+
// CPP Broker does not follow the same Logging convention as the Java broker
org.apache.qpid.server.BrokerStartupTest#*
org.apache.qpid.server.logging.*
diff --git a/qpid/java/test-profiles/JavaTransientExcludes b/qpid/java/test-profiles/JavaTransientExcludes
index b4e5282228..2fc3005a0f 100644
--- a/qpid/java/test-profiles/JavaTransientExcludes
+++ b/qpid/java/test-profiles/JavaTransientExcludes
@@ -1,3 +1,13 @@
//These tests require a persistent store
org.apache.qpid.server.store.PersistentStoreTest#*
+
org.apache.qpid.test.unit.ct.DurableSubscriberTest#*
+
+org.apache.qpid.server.store.MessageStoreTest#testMessagePersistence
+org.apache.qpid.server.store.MessageStoreTest#testMessageRemoval
+org.apache.qpid.server.store.MessageStoreTest#testBindingPersistence
+org.apache.qpid.server.store.MessageStoreTest#testDurableBindingRemoval
+org.apache.qpid.server.store.MessageStoreTest#testQueuePersistence
+org.apache.qpid.server.store.MessageStoreTest#testDurableQueueRemoval
+org.apache.qpid.server.store.MessageStoreTest#testExchangePersistence
+org.apache.qpid.server.store.MessageStoreTest#testDurableExchangeRemoval