diff options
Diffstat (limited to 'qpid/java/systests/src/main/java')
-rw-r--r-- | qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java | 8 | ||||
-rw-r--r-- | qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java | 42 | ||||
-rw-r--r-- | qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java (renamed from qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java) | 96 | ||||
-rw-r--r-- | qpid/java/systests/src/main/java/org/apache/qpid/test/utils/TestBrokerConfiguration.java | 20 |
4 files changed, 82 insertions, 84 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java index 406a20d557..0993783e54 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java @@ -27,7 +27,7 @@ import org.apache.qpid.server.message.EnqueueableMessage; import org.apache.qpid.server.message.MessageContentSource; import org.apache.qpid.server.model.ConfiguredObject; -public class QuotaMessageStore extends NullMessageStore +public class QuotaMessageStore extends AbstractMemoryMessageStore { public static final String TYPE = "QuotaMessageStore"; private final AtomicLong _messageId = new AtomicLong(1); @@ -155,10 +155,4 @@ public class QuotaMessageStore extends NullMessageStore } } } - - @Override - public String getStoreType() - { - return TYPE; - } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java index e20196c98d..95bffa89aa 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java @@ -31,6 +31,10 @@ import org.apache.log4j.Logger; import org.apache.qpid.server.message.EnqueueableMessage; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.plugin.MessageStoreFactory; +import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler; +import org.apache.qpid.server.store.handler.DistributedTransactionHandler; +import org.apache.qpid.server.store.handler.MessageHandler; +import org.apache.qpid.server.store.handler.MessageInstanceHandler; public class SlowMessageStore implements MessageStore, DurableConfigurationStore { @@ -63,12 +67,6 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore } } - @Override - public void recoverConfigurationStore(ConfigurationRecoveryHandler recoveryHandler) - { - _realDurableConfigurationStore.recoverConfigurationStore(recoveryHandler); - } - private void configureDelays(Map<String, Object> delays) { @@ -294,12 +292,6 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore } @Override - public void recoverMessageStore(MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler) - { - _realMessageStore.recoverMessageStore(messageRecoveryHandler, transactionLogRecoveryHandler); - } - - @Override public void addEventListener(EventListener eventListener, Event... events) { if (_realMessageStore == null) @@ -319,15 +311,33 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore } @Override - public String getStoreType() + public void onDelete() { - return TYPE; + _realMessageStore.onDelete(); } @Override - public void onDelete() + public void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler) throws StoreException { - _realMessageStore.onDelete(); + _realDurableConfigurationStore.visitConfiguredObjectRecords(handler); + } + + @Override + public void visitMessages(MessageHandler handler) throws StoreException + { + _realMessageStore.visitMessages(handler); + } + + @Override + public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException + { + _realMessageStore.visitMessageInstances(handler); + } + + @Override + public void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException + { + _realMessageStore.visitDistributedTransactions(handler); } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java index d89f5cc66e..7db8210753 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.server.store; - import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -46,7 +45,6 @@ import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.model.Binding; -import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.ExclusivityPolicy; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Queue; @@ -68,14 +66,16 @@ import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.util.FileUtils; /** - * This tests the MessageStores by using the available interfaces. + * + * Virtualhost/store integration test. Tests for correct behaviour of the message store + * when exercised via the higher level functions of the store. * * For persistent stores, it validates that Exchanges, Queues, Bindings and * Messages are persisted and recovered correctly. */ -public class MessageStoreTest extends QpidTestCase +public class VirtualHostMessageStoreTest extends QpidTestCase { - private static final Logger _logger = Logger.getLogger(MessageStoreTest.class); + private static final Logger _logger = Logger.getLogger(VirtualHostMessageStoreTest.class); public static final int DEFAULT_PRIORTY_LEVEL = 5; public static final String SELECTOR_VALUE = "Test = 'MST'"; @@ -103,8 +103,7 @@ public class MessageStoreTest extends QpidTestCase private String queueOwner = "MST"; private VirtualHost _virtualHost; - private org.apache.qpid.server.model.VirtualHost _virtualHostModel; - private Broker _broker; + private org.apache.qpid.server.model.VirtualHost<?> _virtualHostModel; private String _storePath; public void setUp() throws Exception @@ -120,6 +119,7 @@ public class MessageStoreTest extends QpidTestCase messageStoreSettings.put(MessageStore.STORE_TYPE, getTestProfileMessageStoreType()); _virtualHostModel = mock(org.apache.qpid.server.model.VirtualHost.class); + when(_virtualHostModel.getMessageStoreSettings()).thenReturn(messageStoreSettings); when(_virtualHostModel.getAttribute(eq(org.apache.qpid.server.model.VirtualHost.TYPE))).thenReturn(StandardVirtualHostFactory.TYPE); when(_virtualHostModel.getAttribute(eq(org.apache.qpid.server.model.VirtualHost.NAME))).thenReturn(hostName); @@ -128,8 +128,6 @@ public class MessageStoreTest extends QpidTestCase cleanup(new File(_storePath)); - _broker = BrokerTestHelper.createBrokerMock(); - reloadVirtualHost(); } @@ -201,10 +199,6 @@ public class MessageStoreTest extends QpidTestCase assertTrue("Virtualhost has not changed, reload was not successful", original != getVirtualHost()); } - /** - * Old MessageStoreTest segment which runs against both persistent and non-persistent stores - * creating queues, exchanges and bindings and then verifying message delivery to them. - */ public void testQueueExchangeAndBindingCreation() throws Exception { assertEquals("Should not be any existing queues", 0, getVirtualHost().getQueues().size()); @@ -213,15 +207,15 @@ public class MessageStoreTest extends QpidTestCase createAllTopicQueues(); //Register Non-Durable DirectExchange - ExchangeImpl nonDurableExchange = createExchange(DirectExchange.TYPE, nonDurableExchangeName, false); + ExchangeImpl<?> nonDurableExchange = createExchange(DirectExchange.TYPE, nonDurableExchangeName, false); bindAllQueuesToExchange(nonDurableExchange, directRouting); //Register DirectExchange - ExchangeImpl directExchange = createExchange(DirectExchange.TYPE, directExchangeName, true); + ExchangeImpl<?> directExchange = createExchange(DirectExchange.TYPE, directExchangeName, true); bindAllQueuesToExchange(directExchange, directRouting); //Register TopicExchange - ExchangeImpl topicExchange = createExchange(TopicExchange.TYPE, topicExchangeName, true); + ExchangeImpl<?> topicExchange = createExchange(TopicExchange.TYPE, topicExchangeName, true); bindAllTopicQueuesToExchange(topicExchange, topicRouting); //Send Message To NonDurable direct Exchange = persistent @@ -248,12 +242,6 @@ public class MessageStoreTest extends QpidTestCase 10, getVirtualHost().getQueues().size()); } - /** - * Tests message persistence by running the testQueueExchangeAndBindingCreation() method above - * before reloading the virtual host and ensuring that the persistent messages were restored. - * - * More specific testing of message persistence is left to store-specific unit testing. - */ public void testMessagePersistence() throws Exception { testQueueExchangeAndBindingCreation(); @@ -346,7 +334,7 @@ public class MessageStoreTest extends QpidTestCase 1, getVirtualHost().getQueues().size()); //test that removing the queue means it is not recovered next time - final AMQQueue queue = getVirtualHost().getQueue(durableQueueName); + final AMQQueue<?> queue = getVirtualHost().getQueue(durableQueueName); DurableConfigurationStoreHelper.removeQueue(getVirtualHost().getDurableConfigurationStore(),queue); reloadVirtualHost(); @@ -397,7 +385,7 @@ public class MessageStoreTest extends QpidTestCase origExchangeCount + 1, getVirtualHost().getExchanges().size()); //test that removing the exchange means it is not recovered next time - final ExchangeImpl exchange = getVirtualHost().getExchange(directExchangeName); + final ExchangeImpl<?> exchange = getVirtualHost().getExchange(directExchangeName); DurableConfigurationStoreHelper.removeExchange(getVirtualHost().getDurableConfigurationStore(), exchange); reloadVirtualHost(); @@ -423,9 +411,9 @@ public class MessageStoreTest extends QpidTestCase Map<String, ExchangeImpl<?>> exchanges = createExchanges(); - ExchangeImpl nonDurableExchange = exchanges.get(nonDurableExchangeName); - ExchangeImpl directExchange = exchanges.get(directExchangeName); - ExchangeImpl topicExchange = exchanges.get(topicExchangeName); + ExchangeImpl<?> nonDurableExchange = exchanges.get(nonDurableExchangeName); + ExchangeImpl<?> directExchange = exchanges.get(directExchangeName); + ExchangeImpl<?> topicExchange = exchanges.get(topicExchangeName); bindAllQueuesToExchange(nonDurableExchange, directRouting); bindAllQueuesToExchange(directExchange, directRouting); @@ -449,7 +437,7 @@ public class MessageStoreTest extends QpidTestCase public void testDurableBindingRemoval() throws Exception { //create durable queue and exchange, bind them - ExchangeImpl exch = createExchange(DirectExchange.TYPE, directExchangeName, true); + ExchangeImpl<?> exch = createExchange(DirectExchange.TYPE, directExchangeName, true); createQueue(durableQueueName, false, true, false, false); bindQueueToExchange(exch, directRouting, getVirtualHost().getQueue(durableQueueName), false); @@ -482,7 +470,7 @@ public class MessageStoreTest extends QpidTestCase private void validateExchanges(int originalNumExchanges, Map<String, ExchangeImpl<?>> oldExchanges) { Collection<ExchangeImpl<?>> exchanges = getVirtualHost().getExchanges(); - Collection<String> exchangeNames = new ArrayList(exchanges.size()); + Collection<String> exchangeNames = new ArrayList<String>(exchanges.size()); for(ExchangeImpl<?> exchange : exchanges) { exchangeNames.add(exchange.getName()); @@ -506,6 +494,7 @@ public class MessageStoreTest extends QpidTestCase } /** Validates the Durable queues and their properties are as expected following recovery */ + @SuppressWarnings("unchecked") private void validateBindingProperties() { @@ -526,11 +515,11 @@ public class MessageStoreTest extends QpidTestCase * @param bindings the set of bindings to validate * @param useSelectors if set, check the binding has a JMS_SELECTOR argument and the correct value for it */ - private void validateBindingProperties(Collection<? extends Binding> bindings, boolean useSelectors) + private void validateBindingProperties(Collection<? extends Binding<?>> bindings, boolean useSelectors) { assertEquals("Each queue should only be bound once.", 1, bindings.size()); - Binding binding = bindings.iterator().next(); + Binding<?> binding = bindings.iterator().next(); if (useSelectors) { @@ -543,13 +532,13 @@ public class MessageStoreTest extends QpidTestCase private void setQueueExclusivity(boolean exclusive) throws MessageSource.ExistingConsumerPreventsExclusive { - AMQQueue queue = getVirtualHost().getQueue(durableExclusiveQueueName); + AMQQueue<?> queue = getVirtualHost().getQueue(durableExclusiveQueueName); queue.setExclusivityPolicy(exclusive ? ExclusivityPolicy.CONTAINER : ExclusivityPolicy.NONE); } private void validateQueueExclusivityProperty(boolean expected) { - AMQQueue queue = getVirtualHost().getQueue(durableExclusiveQueueName); + AMQQueue<?> queue = getVirtualHost().getQueue(durableExclusiveQueueName); assertEquals("Queue exclusivity was incorrect", queue.isExclusive(), expected); } @@ -565,7 +554,7 @@ public class MessageStoreTest extends QpidTestCase validateQueueProperties(getVirtualHost().getQueue(durableLastValueQueueName), false, true, true, true); } - private void validateQueueProperties(AMQQueue queue, boolean usePriority, boolean durable, boolean exclusive, boolean lastValueQueue) + private void validateQueueProperties(AMQQueue<?> queue, boolean usePriority, boolean durable, boolean exclusive, boolean lastValueQueue) { if(usePriority || lastValueQueue) { @@ -588,9 +577,9 @@ public class MessageStoreTest extends QpidTestCase assertEquals("Queue is not 'simple'", StandardQueue.class, queue.getClass()); } - assertEquals("Queue owner is not as expected", exclusive ? queueOwner : null, queue.getOwner()); - assertEquals("Queue durability is not as expected", durable, queue.isDurable()); - assertEquals("Queue exclusivity is not as expected", exclusive, queue.isExclusive()); + assertEquals("Queue owner is not as expected for queue " + queue.getName(), exclusive ? queueOwner : null, queue.getOwner()); + assertEquals("Queue durability is not as expected for queue " + queue.getName(), durable, queue.isDurable()); + assertEquals("Queue exclusivity is not as expected for queue " + queue.getName(), exclusive, queue.isExclusive()); } /** @@ -606,7 +595,7 @@ public class MessageStoreTest extends QpidTestCase } } - private void sendMessageOnExchange(ExchangeImpl exchange, String routingKey, boolean deliveryMode) + private void sendMessageOnExchange(ExchangeImpl<?> exchange, String routingKey, boolean deliveryMode) { //Set MessagePersistence BasicContentHeaderProperties properties = new BasicContentHeaderProperties(); @@ -698,15 +687,12 @@ public class MessageStoreTest extends QpidTestCase { queueArguments.put(Queue.OWNER, queueOwner); } - AMQQueue queue = null; + AMQQueue<?> queue = null; //Ideally we would be able to use the QueueDeclareHandler here. queue = getVirtualHost().createQueue(queueArguments); validateQueueProperties(queue, usePriority, durable, exclusive, lastValueQueue); - - - } private Map<String, ExchangeImpl<?>> createExchanges() throws Exception @@ -733,14 +719,14 @@ public class MessageStoreTest extends QpidTestCase attributes.put(org.apache.qpid.server.model.Exchange.TYPE, type.getType()); attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, durable); attributes.put(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY, - false ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT); + durable ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT); attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, null); exchange = getVirtualHost().createExchange(attributes); return exchange; } - private void bindAllQueuesToExchange(ExchangeImpl exchange, String routingKey) + private void bindAllQueuesToExchange(ExchangeImpl<?> exchange, String routingKey) { bindQueueToExchange(exchange, routingKey, getVirtualHost().getQueue(durablePriorityQueueName), false); bindQueueToExchange(exchange, routingKey, getVirtualHost().getQueue(durableQueueName), false); @@ -749,7 +735,7 @@ public class MessageStoreTest extends QpidTestCase bindQueueToExchange(exchange, routingKey, getVirtualHost().getQueue(durableExclusiveQueueName), false); } - private void bindAllTopicQueuesToExchange(ExchangeImpl exchange, String routingKey) + private void bindAllTopicQueuesToExchange(ExchangeImpl<?> exchange, String routingKey) { bindQueueToExchange(exchange, routingKey, getVirtualHost().getQueue(durablePriorityTopicQueueName), true); @@ -759,9 +745,9 @@ public class MessageStoreTest extends QpidTestCase } - protected void bindQueueToExchange(ExchangeImpl exchange, + protected void bindQueueToExchange(ExchangeImpl<?> exchange, String routingKey, - AMQQueue queue, + AMQQueue<?> queue, boolean useSelector) { Map<String,Object> bindArguments = new HashMap<String, Object>(); @@ -781,9 +767,9 @@ public class MessageStoreTest extends QpidTestCase } } - protected void unbindQueueFromExchange(ExchangeImpl exchange, + protected void unbindQueueFromExchange(ExchangeImpl<?> exchange, String routingKey, - AMQQueue queue, + AMQQueue<?> queue, boolean useSelector) { Map<String,Object> bindArguments = new HashMap<String, Object>(); @@ -829,7 +815,7 @@ public class MessageStoreTest extends QpidTestCase private void validateMessageOnQueue(String queueName, long messageCount) { - AMQQueue queue = getVirtualHost().getQueue(queueName); + AMQQueue<?> queue = getVirtualHost().getQueue(queueName); assertNotNull("Queue(" + queueName + ") not correctly registered:", queue); @@ -839,12 +825,12 @@ public class MessageStoreTest extends QpidTestCase private class TestMessagePublishInfo implements MessagePublishInfo { - ExchangeImpl _exchange; + ExchangeImpl<?> _exchange; boolean _immediate; boolean _mandatory; String _routingKey; - TestMessagePublishInfo(ExchangeImpl exchange, boolean immediate, boolean mandatory, String routingKey) + TestMessagePublishInfo(ExchangeImpl<?> exchange, boolean immediate, boolean mandatory, String routingKey) { _exchange = exchange; _immediate = immediate; @@ -852,29 +838,35 @@ public class MessageStoreTest extends QpidTestCase _routingKey = routingKey; } + @Override public AMQShortString getExchange() { return new AMQShortString(_exchange.getName()); } + @Override public void setExchange(AMQShortString exchange) { //no-op } + @Override public boolean isImmediate() { return _immediate; } + @Override public boolean isMandatory() { return _mandatory; } + @Override public AMQShortString getRoutingKey() { return new AMQShortString(_routingKey); } } + } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/TestBrokerConfiguration.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/TestBrokerConfiguration.java index 4c0e2b7ffc..59b4d496fa 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/TestBrokerConfiguration.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/TestBrokerConfiguration.java @@ -45,10 +45,9 @@ import org.apache.qpid.server.model.SystemContext; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.security.access.FileAccessControlProviderConstants; import org.apache.qpid.server.security.group.FileGroupManagerFactory; -import org.apache.qpid.server.store.ConfigurationRecoveryHandler; import org.apache.qpid.server.store.ConfiguredObjectRecord; import org.apache.qpid.server.store.ConfiguredObjectRecordImpl; -import org.apache.qpid.server.store.DurableConfigurationStore; +import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler; public class TestBrokerConfiguration { @@ -191,7 +190,7 @@ public class TestBrokerConfiguration private ConfiguredObjectRecord findObject(final Class<? extends ConfiguredObject> category, final String objectName) { final RecordFindingVisitor visitor = new RecordFindingVisitor(category, objectName); - _store.recoverConfigurationStore(visitor); + _store.visitConfiguredObjectRecords(visitor); return visitor.getFoundRecord(); } @@ -235,11 +234,12 @@ public class TestBrokerConfiguration return findObject(category, name).getAttributes(); } - private static class RecordFindingVisitor implements ConfigurationRecoveryHandler + private static class RecordFindingVisitor implements ConfiguredObjectRecordHandler { private final Class<? extends ConfiguredObject> _category; private final String _objectName; public ConfiguredObjectRecord _foundRecord; + private int _version; public RecordFindingVisitor(final Class<? extends ConfiguredObject> category, final String objectName) { @@ -248,26 +248,28 @@ public class TestBrokerConfiguration } @Override - public void beginConfigurationRecovery(final DurableConfigurationStore store, final int configVersion) + public void begin(final int configVersion) { - + _version = configVersion; } @Override - public void configuredObject(final ConfiguredObjectRecord object) + public boolean handle(final ConfiguredObjectRecord object) { if (object.getType().equals(_category.getSimpleName()) && (_objectName == null || _objectName.equals(object.getAttributes().get(ConfiguredObject.NAME)))) { _foundRecord = object; + return false; } + return true; } @Override - public int completeConfigurationRecovery() + public int end() { - return 0; + return _version; } public ConfiguredObjectRecord getFoundRecord() |