diff options
author | Robert Gemmell <robbie@apache.org> | 2010-08-06 14:27:04 +0000 |
---|---|---|
committer | Robert Gemmell <robbie@apache.org> | 2010-08-06 14:27:04 +0000 |
commit | c41701a0e329e9782772c5e704d19de9afaaaa7c (patch) | |
tree | 4627244893a2b1206cdce1d11bef25cb4c89c38f | |
parent | a0931c1851659f80de2a94719af8ad573dcf0b73 (diff) | |
download | qpid-python-c41701a0e329e9782772c5e704d19de9afaaaa7c.tar.gz |
QPID-2787: split existing test into component test methods regarding persistence/recovery of queues, exchanges, messages,bindings. Add additional tests around removal of persisted items.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@982988 13f79535-47bb-0310-9956-ffa450edef68
3 files changed, 425 insertions, 172 deletions
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java index d6a8e8e15d..673928e619 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java @@ -20,12 +20,12 @@ */ package org.apache.qpid.server.store; -import org.apache.commons.configuration.Configuration; -import org.apache.commons.configuration.PropertiesConfiguration; -import org.apache.qpid.server.util.InternalBrokerBaseCase; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.io.File; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.qpid.AMQException; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.framing.AMQShortString; @@ -53,102 +53,96 @@ import org.apache.qpid.server.queue.SimpleAMQQueue; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.server.util.InternalBrokerBaseCase; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.util.FileUtils; -import java.io.File; -import java.util.List; - /** * This tests the MessageStores by using the available interfaces. * - * This test validates that Exchanges, Queues, Bindings and Messages are persisted and - * recovered correctly. + * For persistent stores, it validates that Exchanges, Queues, Bindings and + * Messages are persisted and recovered correctly. */ public class MessageStoreTest extends InternalBrokerBaseCase { + public static final int DEFAULT_PRIORTY_LEVEL = 5; + public static final String SELECTOR_VALUE = "Test = 'MST'"; - private static final int DEFAULT_PRIORTY_LEVEL = 5; - private static final Logger _logger = LoggerFactory.getLogger(MessageStoreTest.class); + AMQShortString nonDurableExchangeName = new AMQShortString("MST-NonDurableDirectExchange"); + AMQShortString directExchangeName = new AMQShortString("MST-DirectExchange"); + AMQShortString topicExchangeName = new AMQShortString("MST-TopicExchange"); - public void testMemoryMessageStore() throws Exception - { + AMQShortString durablePriorityTopicQueueName = new AMQShortString("MST-PriorityTopicQueue-Durable"); + AMQShortString durableTopicQueueName = new AMQShortString("MST-TopicQueue-Durable"); + AMQShortString priorityTopicQueueName = new AMQShortString("MST-PriorityTopicQueue"); + AMQShortString topicQueueName = new AMQShortString("MST-TopicQueue"); + + AMQShortString durableExclusiveQueueName = new AMQShortString("MST-Queue-Durable-Exclusive"); + AMQShortString durablePriorityQueueName = new AMQShortString("MST-PriorityQueue-Durable"); + AMQShortString durableQueueName = new AMQShortString("MST-Queue-Durable"); + AMQShortString priorityQueueName = new AMQShortString("MST-PriorityQueue"); + AMQShortString queueName = new AMQShortString("MST-Queue"); - PropertiesConfiguration config = new PropertiesConfiguration(); + AMQShortString directRouting = new AMQShortString("MST-direct"); + AMQShortString topicRouting = new AMQShortString("MST-topic"); - config.addProperty("store.class", "org.apache.qpid.server.store.MemoryMessageStore"); + AMQShortString queueOwner = new AMQShortString("MST"); - runTestWithStore(config); - } + protected PropertiesConfiguration _config; - public void testDerbyMessageStore() throws Exception + public void setUp() throws Exception { - PropertiesConfiguration config = new PropertiesConfiguration(); + super.setUp(); + + String storePath = System.getProperty("QPID_WORK") + "/" + getName(); + + _config = new PropertiesConfiguration(); + _config.addProperty("store.class", getTestProfileMessageStoreClassName()); + _config.addProperty("store.environment-path", storePath); - config.addProperty("store.environment-path", - System.getProperty("QPID_WORK") + "/derbyDB_MessageStoreTest"); - config.addProperty("store.class", "org.apache.qpid.server.store.DerbyMessageStore"); + cleanup(new File(storePath)); - runTestWithStore(config); + reloadVirtualHost(); } - private void reload(Configuration configuration) + protected void reloadVirtualHost() { + VirtualHost original = _virtualHost; + if (_virtualHost != null) { try { _virtualHost.close(); _virtualHost.getApplicationRegistry(). - getVirtualHostRegistry().unregisterVirtualHost(_virtualHost); + getVirtualHostRegistry().unregisterVirtualHost(_virtualHost); } catch (Exception e) { fail(e.getMessage()); - } } try { - _virtualHost = ApplicationRegistry.getInstance().createVirtualHost(new VirtualHostConfiguration(getClass().getName(), configuration)); + _virtualHost = ApplicationRegistry.getInstance().createVirtualHost(new VirtualHostConfiguration(getClass().getName(), _config)); } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); } - } - AMQShortString nonDurableExchangeName = new AMQShortString("MST-NonDurableDirectExchange"); - AMQShortString directExchangeName = new AMQShortString("MST-DirectExchange"); - AMQShortString topicExchangeName = new AMQShortString("MST-TopicExchange"); - AMQShortString queueOwner = new AMQShortString("MST"); - - AMQShortString durablePriorityTopicQueueName = new AMQShortString("MST-PriorityTopicQueue-Durable"); - AMQShortString durableTopicQueueName = new AMQShortString("MST-TopicQueue-Durable"); - AMQShortString priorityTopicQueueName = new AMQShortString("MST-PriorityTopicQueue"); - AMQShortString topicQueueName = new AMQShortString("MST-TopicQueue"); - - AMQShortString durableExclusiveQueueName = new AMQShortString("MST-Queue-Durable-Exclusive"); - AMQShortString durablePriorityQueueName = new AMQShortString("MST-PriorityQueue-Durable"); - AMQShortString durableQueueName = new AMQShortString("MST-Queue-Durable"); - AMQShortString priorityQueueName = new AMQShortString("MST-PriorityQueue"); - AMQShortString queueName = new AMQShortString("MST-Queue"); - - AMQShortString directRouting = new AMQShortString("MST-direct"); - AMQShortString topicRouting = new AMQShortString("MST-topic"); + assertTrue("Virtualhost has not changed, reload was not successful", original != _virtualHost); + } - - protected void runTestWithStore(Configuration configuration) throws AMQException + /** + * Old MessageStoreTest segment which runs against both persistent and non-persistent stores + * creating queues, exchanges and bindings and then verifying message delivery to them. + */ + public void testQueueExchangeAndBindingCreation() throws Exception { - //Ensure Environment Path is empty - cleanup(configuration); - - //Load the Virtualhost with the required MessageStore - reload(configuration); - - MessageStore messageStore = _virtualHost.getMessageStore(); + assertEquals("Should not be any existing queues", 0, _virtualHost.getQueueRegistry().getQueues().size()); createAllQueues(); createAllTopicQueues(); @@ -185,90 +179,303 @@ public class MessageStoreTest extends InternalBrokerBaseCase //Ensure all the topics have two messages (one transient, one persistent) validateMessageOnTopics(2, true); - assertEquals("Not all queues correctly registered", 9, _virtualHost.getQueueRegistry().getQueues().size()); + assertEquals("Not all queues correctly registered", + 9, _virtualHost.getQueueRegistry().getQueues().size()); + } - if (!messageStore.isPersistent()) - { - _logger.warn("Unable to test Persistent capabilities of messages store(" + messageStore.getClass() + ") as it is not capable of peristence."); - return; - } + /** + * Tests message persistence by running the testQueueExchangeAndBindingCreation() method above + * before reloading the virtual host and ensuring that the persistent messages were restored. + * + * More specific testing of message persistence is left to store-specific unit testing. + */ + public void testMessagePersistence() throws Exception + { + testQueueExchangeAndBindingCreation(); - //Reload the Virtualhost to test persistence - _logger.info("Reloading Virtualhost"); + reloadVirtualHost(); - VirtualHost original = _virtualHost; + //Validate durable queues and subscriptions still have the persistent messages + validateMessageOnQueues(2, false); + validateMessageOnTopics(1, false); + } + + /** + * Tests message removal by running the testMessagePersistence() method above before + * clearing the queues, reloading the virtual host, and ensuring that the persistent + * messages were removed from the queues. + */ + public void testMessageRemoval() throws Exception + { + testMessagePersistence(); - reload(configuration); + QueueRegistry queueRegistry = _virtualHost.getQueueRegistry(); - assertTrue("Virtualhost has not been reloaded", original != _virtualHost); + assertEquals("Incorrect number of queues registered after recovery", + 5, queueRegistry.getQueues().size()); - validateExchanges(); + //clear the queue + queueRegistry.getQueue(durableQueueName).clearQueue(); + + //check the messages are gone + validateMessageOnQueue(durableQueueName, 0); + + //reload and verify messages arent restored + reloadVirtualHost(); - //Validate Durable Queues still have the persistent message - validateMessageOnQueues(2, false); - //Validate Durable Queues still have the persistent message - validateMessageOnTopics(1, false); + validateMessageOnQueue(durableQueueName, 0); + } - //Validate Properties of Binding - validateBindingProperties(); + /** + * Tests queue persistence by creating a selection of queues with differing properties, both + * durable and non durable, and ensuring that following the recovery process the correct queues + * are present and any property manipulations (eg queue exclusivity) are correctly recovered. + */ + public void testQueuePersistence() throws Exception + { + assertEquals("Should not be any existing queues", + 0, _virtualHost.getQueueRegistry().getQueues().size()); - //Validate Properties of Queues - validateQueueProperties(); - validateQueueExclusivityProperty(true); + //create durable and non durable queues/topics + createAllQueues(); + createAllTopicQueues(); - //Validate Non-Durable Queues are gone. - assertNull("Non-Durable queue still registered:" + priorityQueueName, _virtualHost.getQueueRegistry().getQueue(priorityQueueName)); - assertNull("Non-Durable queue still registered:" + queueName, _virtualHost.getQueueRegistry().getQueue(queueName)); - assertNull("Non-Durable queue still registered:" + priorityTopicQueueName, _virtualHost.getQueueRegistry().getQueue(priorityTopicQueueName)); - assertNull("Non-Durable queue still registered:" + topicQueueName, _virtualHost.getQueueRegistry().getQueue(topicQueueName)); + //reload the virtual host, prompting recovery of the queues/topics + reloadVirtualHost(); - assertEquals("Not all queues correctly registered", 5, _virtualHost.getQueueRegistry().getQueues().size()); - - //Try updating the queue exclusivity and verify it is persisted and recovered correctly + QueueRegistry queueRegistry = _virtualHost.getQueueRegistry(); + + assertEquals("Incorrect number of queues registered after recovery", + 5, queueRegistry.getQueues().size()); + + //Validate the non-Durable Queues were not recovered. + assertNull("Non-Durable queue still registered:" + priorityQueueName, + queueRegistry.getQueue(priorityQueueName)); + assertNull("Non-Durable queue still registered:" + queueName, + queueRegistry.getQueue(queueName)); + assertNull("Non-Durable queue still registered:" + priorityTopicQueueName, + queueRegistry.getQueue(priorityTopicQueueName)); + assertNull("Non-Durable queue still registered:" + topicQueueName, + queueRegistry.getQueue(topicQueueName)); + + //Validate normally expected properties of Queues/Topics + validateDurableQueueProperties(); + + //Update the durable exclusive queue's exclusivity and verify it is persisted and recovered correctly setQueueExclusivity(false); + validateQueueExclusivityProperty(false); + + //Reload the Virtualhost to recover the queues again + reloadVirtualHost(); + + //verify the change was persisted and recovered correctly + validateQueueExclusivityProperty(false); + } + + /** + * Tests queue removal by creating a durable queue, verifying it recovers, and + * then removing it from the store, and ensuring that following the second reload + * process it is not recovered. + */ + public void testDurableQueueRemoval() throws Exception + { + //Register Durable Queue + createQueue(durableQueueName, false, true, false); + + QueueRegistry queueRegistry = _virtualHost.getQueueRegistry(); + assertEquals("Incorrect number of queues registered before recovery", + 1, queueRegistry.getQueues().size()); + + reloadVirtualHost(); - //Reload the Virtualhost to test update to queue exclusivity - _logger.info("Reloading Virtualhost"); - original = _virtualHost; - reload(configuration); + queueRegistry = _virtualHost.getQueueRegistry(); + assertEquals("Incorrect number of queues registered after first recovery", + 1, queueRegistry.getQueues().size()); - assertTrue("Virtualhost has not been reloaded", original != _virtualHost); + //test that removing the queue means it is not recovered next time + _virtualHost.getDurableConfigurationStore().removeQueue(queueRegistry.getQueue(durableQueueName)); - //verify the change was persisted - validateQueueExclusivityProperty(false); + reloadVirtualHost(); + + queueRegistry = _virtualHost.getQueueRegistry(); + assertEquals("Incorrect number of queues registered after second recovery", + 0, queueRegistry.getQueues().size()); + assertNull("Durable queue was not removed:" + durableQueueName, + queueRegistry.getQueue(durableQueueName)); } - private void validateExchanges() + /** + * Tests exchange persistence by creating a selection of exchanges, both durable + * and non durable, and ensuring that following the recovery process the correct + * durable exchanges are still present. + */ + public void testExchangePersistence() throws Exception { - ExchangeRegistry registry = _virtualHost.getExchangeRegistry(); + int origExchangeCount = _virtualHost.getExchangeRegistry().getExchangeNames().size(); + + Map<AMQShortString, Exchange> oldExchanges = createExchanges(); + + assertEquals("Incorrect number of exchanges registered before recovery", + origExchangeCount + 3, _virtualHost.getExchangeRegistry().getExchangeNames().size()); + + reloadVirtualHost(); + + //verify the exchanges present after recovery + validateExchanges(origExchangeCount, oldExchanges); + } + + /** + * Tests exchange removal by creating a durable exchange, verifying it recovers, and + * then removing it from the store, and ensuring that following the second reload + * process it is not recovered. + */ + public void testDurableExchangeRemoval() throws Exception + { + int origExchangeCount = _virtualHost.getExchangeRegistry().getExchangeNames().size(); + + createExchange(DirectExchange.TYPE, directExchangeName, true); + + ExchangeRegistry exchangeRegistry = _virtualHost.getExchangeRegistry(); + assertEquals("Incorrect number of exchanges registered before recovery", + origExchangeCount + 1, exchangeRegistry.getExchangeNames().size()); + + reloadVirtualHost(); + + exchangeRegistry = _virtualHost.getExchangeRegistry(); + assertEquals("Incorrect number of exchanges registered after first recovery", + origExchangeCount + 1, exchangeRegistry.getExchangeNames().size()); + + //test that removing the exchange means it is not recovered next time + _virtualHost.getDurableConfigurationStore().removeExchange(exchangeRegistry.getExchange(directExchangeName)); + + reloadVirtualHost(); + + exchangeRegistry = _virtualHost.getExchangeRegistry(); + assertEquals("Incorrect number of exchanges registered after second recovery", + origExchangeCount, exchangeRegistry.getExchangeNames().size()); + assertNull("Durable exchange was not removed:" + directExchangeName, + exchangeRegistry.getExchange(directExchangeName)); + } + + /** + * Tests binding persistence by creating a selection of queues and exchanges, both durable + * and non durable, then adding bindings with and without selectors before reloading the + * virtual host and verifying that following the recovery process the correct durable + * bindings (those for durable queues to durable exchanges) are still present. + */ + public void testBindingPersistence() throws Exception + { + int origExchangeCount = _virtualHost.getExchangeRegistry().getExchangeNames().size(); + + createAllQueues(); + createAllTopicQueues(); + + Map<AMQShortString, Exchange> exchanges = createExchanges(); + + Exchange nonDurableExchange = exchanges.get(nonDurableExchangeName); + Exchange directExchange = exchanges.get(directExchangeName); + Exchange topicExchange = exchanges.get(topicExchangeName); + + bindAllQueuesToExchange(nonDurableExchange, directRouting); + bindAllQueuesToExchange(directExchange, directRouting); + bindAllTopicQueuesToExchange(topicExchange, topicRouting); + + assertEquals("Incorrect number of exchanges registered before recovery", + origExchangeCount + 3, _virtualHost.getExchangeRegistry().getExchangeNames().size()); + + reloadVirtualHost(); + + validateExchanges(origExchangeCount, exchanges); + + validateBindingProperties(); + } + + /** + * Tests binding removal by creating a durable exchange, and queue, binding them together, + * recovering to verify the persistence, then removing it from the store, and ensuring + * that following the second reload process it is not recovered. + */ + public void testDurableBindingRemoval() throws Exception + { + QueueRegistry queueRegistry = _virtualHost.getQueueRegistry(); + + //create durable queue and exchange, bind them + Exchange exch = createExchange(DirectExchange.TYPE, directExchangeName, true); + createQueue(durableQueueName, false, true, false); + bindQueueToExchange(exch, directRouting, queueRegistry.getQueue(durableQueueName), false, null); + + assertEquals("Incorrect number of bindings registered before recovery", + 1, queueRegistry.getQueue(durableQueueName).getBindings().size()); + + //verify binding is actually normally recovered + reloadVirtualHost(); + + queueRegistry = _virtualHost.getQueueRegistry(); + assertEquals("Incorrect number of bindings registered after first recovery", + 1, queueRegistry.getQueue(durableQueueName).getBindings().size()); + + ExchangeRegistry exchangeRegistry = _virtualHost.getExchangeRegistry(); + exch = exchangeRegistry.getExchange(directExchangeName); + assertNotNull("Exchange was not recovered", exch); - assertTrue(directExchangeName + " exchange NOT reloaded after failover", - registry.getExchangeNames().contains(directExchangeName)); - assertTrue(topicExchangeName + " exchange NOT reloaded after failover", - registry.getExchangeNames().contains(topicExchangeName)); - assertTrue(nonDurableExchangeName + " exchange reloaded after failover", - !registry.getExchangeNames().contains(nonDurableExchangeName)); + //remove the binding and verify result after recovery + unbindQueueFromExchange(exch, directRouting, queueRegistry.getQueue(durableQueueName), false, null); + + reloadVirtualHost(); + + queueRegistry = _virtualHost.getQueueRegistry(); + assertEquals("Incorrect number of bindings registered after second recovery", + 0, queueRegistry.getQueue(durableQueueName).getBindings().size()); + } + + /** + * Validates that the durable exchanges are still present, the non durable exchange is not, + * and that the new exchanges are not the same objects as the provided list (i.e. that the + * reload actually generated new exchange objects) + */ + private void validateExchanges(int originalNumExchanges, Map<AMQShortString, Exchange> oldExchanges) + { + ExchangeRegistry registry = _virtualHost.getExchangeRegistry(); - // There are 5 required exchanges + qpid.management + our 2 durable exchanges - assertEquals("Incorrect number of exchanges available", 6 + 2, registry.getExchangeNames().size()); + assertTrue(directExchangeName + " exchange NOT reloaded", + registry.getExchangeNames().contains(directExchangeName)); + assertTrue(topicExchangeName + " exchange NOT reloaded", + registry.getExchangeNames().contains(topicExchangeName)); + assertTrue(nonDurableExchangeName + " exchange reloaded", + !registry.getExchangeNames().contains(nonDurableExchangeName)); + + //check the old exchange objects are not the same as the new exchanges + assertTrue(directExchangeName + " exchange NOT reloaded", + registry.getExchange(directExchangeName) != oldExchanges.get(directExchangeName)); + assertTrue(topicExchangeName + " exchange NOT reloaded", + registry.getExchange(topicExchangeName) != oldExchanges.get(topicExchangeName)); + + // There should only be the original exchanges + our 2 recovered durable exchanges + assertEquals("Incorrect number of exchanges available", + originalNumExchanges + 2, registry.getExchangeNames().size()); } - /** Validates that the Durable queues */ + /** Validates the Durable queues and their properties are as expected following recovery */ private void validateBindingProperties() { QueueRegistry queueRegistry = _virtualHost.getQueueRegistry(); + assertEquals("There should be 5 (durable) queues following recovery", 5, queueRegistry.getQueues().size()); + validateBindingProperties(queueRegistry.getQueue(durablePriorityQueueName).getBindings(), false); validateBindingProperties(queueRegistry.getQueue(durablePriorityTopicQueueName).getBindings(), true); validateBindingProperties(queueRegistry.getQueue(durableQueueName).getBindings(), false); validateBindingProperties(queueRegistry.getQueue(durableTopicQueueName).getBindings(), true); + validateBindingProperties(queueRegistry.getQueue(durableExclusiveQueueName).getBindings(), false); } /** - * Validate that each queue is bound once. + * Validate that each queue is bound only once following recovery (i.e. that bindings for non durable + * queues or to non durable exchanges are not recovered), and if a selector should be present + * that it is and contains the correct value * * @param bindings the set of bindings to validate - * @param useSelectors if set validate that the binding has a JMS_SELECTOR argument + * @param useSelectors if set, check the binding has a JMS_SELECTOR argument and the correct value for it */ private void validateBindingProperties(List<Binding> bindings, boolean useSelectors) { @@ -279,16 +486,18 @@ public class MessageStoreTest extends InternalBrokerBaseCase if (useSelectors) { assertTrue("Binding does not contain a Selector argument.", - binding.getArguments().containsKey(AMQPFilterTypes.JMS_SELECTOR.getValue())); + binding.getArguments().containsKey(AMQPFilterTypes.JMS_SELECTOR.getValue())); + assertEquals("The binding selector argument is incorrect", SELECTOR_VALUE, + binding.getArguments().get(AMQPFilterTypes.JMS_SELECTOR.getValue()).toString()); } } - + private void setQueueExclusivity(boolean exclusive) throws AMQException { QueueRegistry queueRegistry = _virtualHost.getQueueRegistry(); AMQQueue queue = queueRegistry.getQueue(durableExclusiveQueueName); - + queue.setExclusive(exclusive); } @@ -297,32 +506,38 @@ public class MessageStoreTest extends InternalBrokerBaseCase QueueRegistry queueRegistry = _virtualHost.getQueueRegistry(); AMQQueue queue = queueRegistry.getQueue(durableExclusiveQueueName); - + assertEquals("Queue exclusivity was incorrect", queue.isExclusive(), expected); } - - private void validateQueueProperties() + + private void validateDurableQueueProperties() { QueueRegistry queueRegistry = _virtualHost.getQueueRegistry(); - validateQueueProperties(queueRegistry.getQueue(durablePriorityQueueName), true); - validateQueueProperties(queueRegistry.getQueue(durablePriorityTopicQueueName), true); - validateQueueProperties(queueRegistry.getQueue(durableQueueName), false); - validateQueueProperties(queueRegistry.getQueue(durableTopicQueueName), false); + validateQueueProperties(queueRegistry.getQueue(durablePriorityQueueName), true, true, false); + validateQueueProperties(queueRegistry.getQueue(durablePriorityTopicQueueName), true, true, false); + validateQueueProperties(queueRegistry.getQueue(durableQueueName), false, true, false); + validateQueueProperties(queueRegistry.getQueue(durableTopicQueueName), false, true, false); + validateQueueProperties(queueRegistry.getQueue(durableExclusiveQueueName), false, true, true); } - private void validateQueueProperties(AMQQueue queue, boolean usePriority) + private void validateQueueProperties(AMQQueue queue, boolean usePriority, boolean durable, boolean exclusive) { if (usePriority) { assertEquals("Queue is no longer a Priority Queue", AMQPriorityQueue.class, queue.getClass()); - assertEquals("Priority Queue does not have set priorities", DEFAULT_PRIORTY_LEVEL, ((AMQPriorityQueue) queue).getPriorities()); + assertEquals("Priority Queue does not have set priorities", + DEFAULT_PRIORTY_LEVEL, ((AMQPriorityQueue) queue).getPriorities()); } else { assertEquals("Queue is no longer a Priority Queue", SimpleAMQQueue.class, queue.getClass()); } + + assertEquals("Queue owner is not as expected", queueOwner, queue.getOwner()); + assertEquals("Queue durability is not as expected", durable, queue.isDurable()); + assertEquals("Queue exclusivity is not as expected", exclusive, queue.isExclusive()); } /** @@ -330,39 +545,31 @@ public class MessageStoreTest extends InternalBrokerBaseCase * * @param configuration The configuration that contains the store environment path. */ - private void cleanup(Configuration configuration) + private void cleanup(File environmentPath) { - - String environment = configuration.getString("store.environment-path"); - - if (environment != null) + if (environmentPath.exists()) { - File environmentPath = new File(environment); - - if (environmentPath.exists()) - { - FileUtils.delete(environmentPath, true); - } + FileUtils.delete(environmentPath, true); } } - private void sendMessageOnExchange(Exchange directExchange, AMQShortString routingKey, boolean deliveryMode) + private void sendMessageOnExchange(Exchange exchange, AMQShortString routingKey, boolean deliveryMode) { - //Set MessagePersustebce + //Set MessagePersistence BasicContentHeaderProperties properties = new BasicContentHeaderProperties(); properties.setDeliveryMode(deliveryMode ? Integer.valueOf(2).byteValue() : Integer.valueOf(1).byteValue()); FieldTable headers = properties.getHeaders(); headers.setString("Test", "MST"); properties.setHeaders(headers); - MessagePublishInfo messageInfo = new TestMessagePublishInfo(directExchange, false, false, routingKey); + MessagePublishInfo messageInfo = new TestMessagePublishInfo(exchange, false, false, routingKey); final IncomingMessage currentMessage; currentMessage = new IncomingMessage(messageInfo); - currentMessage.setExchange(directExchange); + currentMessage.setExchange(exchange); ContentHeaderBody headerBody = new ContentHeaderBody(); headerBody.classId = BasicConsumeBodyImpl.CLASS_ID; @@ -390,7 +597,6 @@ public class MessageStoreTest extends InternalBrokerBaseCase // check and deliver if header says body length is zero if (currentMessage.allContentReceived()) { - // TODO Deliver to queues ServerTransaction trans = new AutoCommitTransaction(_virtualHost.getMessageStore()); final List<? extends BaseQueue> destinationQueues = currentMessage.getDestinationQueues(); trans.enqueue(currentMessage.getDestinationQueues(), currentMessage, new ServerTransaction.Action() { @@ -426,10 +632,10 @@ public class MessageStoreTest extends InternalBrokerBaseCase //Register Durable Simple Queue createQueue(durableQueueName, false, true, false); - + //Register Durable Exclusive Simple Queue createQueue(durableExclusiveQueueName, false, true, true); - + //Register NON-Durable Priority Queue createQueue(priorityQueueName, true, false, false); @@ -452,34 +658,6 @@ public class MessageStoreTest extends InternalBrokerBaseCase createQueue(topicQueueName, false, false, false); } - private Exchange createExchange(ExchangeType type, AMQShortString name, boolean durable) - { - Exchange exchange = null; - - try - { - exchange = type.newInstance(_virtualHost, name, durable, 0, false); - } - catch (AMQException e) - { - fail(e.getMessage()); - } - - try - { - _virtualHost.getExchangeRegistry().registerExchange(exchange); - if (durable) - { - _virtualHost.getMessageStore().createExchange(exchange); - } - } - catch (AMQException e) - { - fail(e.getMessage()); - } - return exchange; - } - private void createQueue(AMQShortString queueName, boolean usePriority, boolean durable, boolean exclusive) { @@ -497,9 +675,9 @@ public class MessageStoreTest extends InternalBrokerBaseCase try { queue = AMQQueueFactory.createAMQQueueImpl(queueName, durable, queueOwner, false, exclusive, - _virtualHost, queueArguments); + _virtualHost, queueArguments); - validateQueueProperties(queue, usePriority); + validateQueueProperties(queue, usePriority, durable, exclusive); if (queue.isDurable() && !queue.isAutoDelete()) { @@ -515,6 +693,48 @@ public class MessageStoreTest extends InternalBrokerBaseCase } + private Map<AMQShortString, Exchange> createExchanges() + { + Map<AMQShortString, Exchange> exchanges = new HashMap<AMQShortString, Exchange>(); + + //Register non-durable DirectExchange + exchanges.put(nonDurableExchangeName, createExchange(DirectExchange.TYPE, nonDurableExchangeName, false)); + + //Register durable DirectExchange and TopicExchange + exchanges.put(directExchangeName ,createExchange(DirectExchange.TYPE, directExchangeName, true)); + exchanges.put(topicExchangeName,createExchange(TopicExchange.TYPE, topicExchangeName, true)); + + return exchanges; + } + + private Exchange createExchange(ExchangeType<?> type, AMQShortString name, boolean durable) + { + Exchange exchange = null; + + try + { + exchange = type.newInstance(_virtualHost, name, durable, 0, false); + } + catch (AMQException e) + { + fail(e.getMessage()); + } + + try + { + _virtualHost.getExchangeRegistry().registerExchange(exchange); + if (durable) + { + _virtualHost.getMessageStore().createExchange(exchange); + } + } + catch (AMQException e) + { + fail(e.getMessage()); + } + return exchange; + } + private void bindAllQueuesToExchange(Exchange exchange, AMQShortString routingKey) { FieldTable queueArguments = new FieldTable(); @@ -526,6 +746,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(durableQueueName), false, null); bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(priorityQueueName), false, queueArguments); bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(queueName), false, null); + bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(durableExclusiveQueueName), false, null); } private void bindAllTopicQueuesToExchange(Exchange exchange, AMQShortString routingKey) @@ -544,14 +765,12 @@ public class MessageStoreTest extends InternalBrokerBaseCase protected void bindQueueToExchange(Exchange exchange, AMQShortString routingKey, AMQQueue queue, boolean useSelector, FieldTable queueArguments) { - - FieldTable bindArguments = null; if (useSelector) { bindArguments = new FieldTable(); - bindArguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), "Test = 'MST'"); + bindArguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), SELECTOR_VALUE ); } try @@ -563,11 +782,25 @@ public class MessageStoreTest extends InternalBrokerBaseCase fail(e.getMessage()); } } - - private void validateMessage(long messageCount, boolean allQueues) + + protected void unbindQueueFromExchange(Exchange exchange, AMQShortString routingKey, AMQQueue queue, boolean useSelector, FieldTable queueArguments) { - validateMessageOnTopics(messageCount, allQueues); - validateMessageOnQueues(messageCount, allQueues); + FieldTable bindArguments = null; + + if (useSelector) + { + bindArguments = new FieldTable(); + bindArguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), SELECTOR_VALUE ); + } + + try + { + _virtualHost.getBindingFactory().removeBinding(String.valueOf(routingKey), queue, exchange, FieldTable.convertToMap(bindArguments)); + } + catch (Exception e) + { + fail(e.getMessage()); + } } private void validateMessageOnTopics(long messageCount, boolean allQueues) diff --git a/qpid/java/test-profiles/CPPExcludes b/qpid/java/test-profiles/CPPExcludes index 8300f672ce..79e8e88c90 100755 --- a/qpid/java/test-profiles/CPPExcludes +++ b/qpid/java/test-profiles/CPPExcludes @@ -73,6 +73,16 @@ org.apache.qpid.server.AlertingTest#* // The C++ server has a totally different persistence mechanism org.apache.qpid.server.store.PersistentStoreTest#* +// These tests are for the Java broker persistent store modules +org.apache.qpid.server.store.MessageStoreTest#testMessagePersistence +org.apache.qpid.server.store.MessageStoreTest#testMessageRemoval +org.apache.qpid.server.store.MessageStoreTest#testBindingPersistence +org.apache.qpid.server.store.MessageStoreTest#testDurableBindingRemoval +org.apache.qpid.server.store.MessageStoreTest#testQueuePersistence +org.apache.qpid.server.store.MessageStoreTest#testDurableQueueRemoval +org.apache.qpid.server.store.MessageStoreTest#testExchangePersistence +org.apache.qpid.server.store.MessageStoreTest#testDurableExchangeRemoval + // CPP Broker does not follow the same Logging convention as the Java broker org.apache.qpid.server.BrokerStartupTest#* org.apache.qpid.server.logging.* diff --git a/qpid/java/test-profiles/JavaTransientExcludes b/qpid/java/test-profiles/JavaTransientExcludes index b4e5282228..2fc3005a0f 100644 --- a/qpid/java/test-profiles/JavaTransientExcludes +++ b/qpid/java/test-profiles/JavaTransientExcludes @@ -1,3 +1,13 @@ //These tests require a persistent store org.apache.qpid.server.store.PersistentStoreTest#* + org.apache.qpid.test.unit.ct.DurableSubscriberTest#* + +org.apache.qpid.server.store.MessageStoreTest#testMessagePersistence +org.apache.qpid.server.store.MessageStoreTest#testMessageRemoval +org.apache.qpid.server.store.MessageStoreTest#testBindingPersistence +org.apache.qpid.server.store.MessageStoreTest#testDurableBindingRemoval +org.apache.qpid.server.store.MessageStoreTest#testQueuePersistence +org.apache.qpid.server.store.MessageStoreTest#testDurableQueueRemoval +org.apache.qpid.server.store.MessageStoreTest#testExchangePersistence +org.apache.qpid.server.store.MessageStoreTest#testDurableExchangeRemoval |