summaryrefslogtreecommitdiff
path: root/qpid/java/systests/src/main/java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/systests/src/main/java')
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java8
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java42
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java (renamed from qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java)96
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/utils/TestBrokerConfiguration.java20
4 files changed, 82 insertions, 84 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java
index 406a20d557..0993783e54 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java
@@ -27,7 +27,7 @@ import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.message.MessageContentSource;
import org.apache.qpid.server.model.ConfiguredObject;
-public class QuotaMessageStore extends NullMessageStore
+public class QuotaMessageStore extends AbstractMemoryMessageStore
{
public static final String TYPE = "QuotaMessageStore";
private final AtomicLong _messageId = new AtomicLong(1);
@@ -155,10 +155,4 @@ public class QuotaMessageStore extends NullMessageStore
}
}
}
-
- @Override
- public String getStoreType()
- {
- return TYPE;
- }
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
index e20196c98d..95bffa89aa 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
@@ -31,6 +31,10 @@ import org.apache.log4j.Logger;
import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.plugin.MessageStoreFactory;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
+import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
+import org.apache.qpid.server.store.handler.MessageHandler;
+import org.apache.qpid.server.store.handler.MessageInstanceHandler;
public class SlowMessageStore implements MessageStore, DurableConfigurationStore
{
@@ -63,12 +67,6 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore
}
}
- @Override
- public void recoverConfigurationStore(ConfigurationRecoveryHandler recoveryHandler)
- {
- _realDurableConfigurationStore.recoverConfigurationStore(recoveryHandler);
- }
-
private void configureDelays(Map<String, Object> delays)
{
@@ -294,12 +292,6 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore
}
@Override
- public void recoverMessageStore(MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler)
- {
- _realMessageStore.recoverMessageStore(messageRecoveryHandler, transactionLogRecoveryHandler);
- }
-
- @Override
public void addEventListener(EventListener eventListener, Event... events)
{
if (_realMessageStore == null)
@@ -319,15 +311,33 @@ public class SlowMessageStore implements MessageStore, DurableConfigurationStore
}
@Override
- public String getStoreType()
+ public void onDelete()
{
- return TYPE;
+ _realMessageStore.onDelete();
}
@Override
- public void onDelete()
+ public void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler) throws StoreException
{
- _realMessageStore.onDelete();
+ _realDurableConfigurationStore.visitConfiguredObjectRecords(handler);
+ }
+
+ @Override
+ public void visitMessages(MessageHandler handler) throws StoreException
+ {
+ _realMessageStore.visitMessages(handler);
+ }
+
+ @Override
+ public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException
+ {
+ _realMessageStore.visitMessageInstances(handler);
+ }
+
+ @Override
+ public void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException
+ {
+ _realMessageStore.visitDistributedTransactions(handler);
}
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java
index d89f5cc66e..7db8210753 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.store;
-
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -46,7 +45,6 @@ import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.model.Binding;
-import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ExclusivityPolicy;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
@@ -68,14 +66,16 @@ import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.util.FileUtils;
/**
- * This tests the MessageStores by using the available interfaces.
+ *
+ * Virtualhost/store integration test. Tests for correct behaviour of the message store
+ * when exercised via the higher level functions of the store.
*
* For persistent stores, it validates that Exchanges, Queues, Bindings and
* Messages are persisted and recovered correctly.
*/
-public class MessageStoreTest extends QpidTestCase
+public class VirtualHostMessageStoreTest extends QpidTestCase
{
- private static final Logger _logger = Logger.getLogger(MessageStoreTest.class);
+ private static final Logger _logger = Logger.getLogger(VirtualHostMessageStoreTest.class);
public static final int DEFAULT_PRIORTY_LEVEL = 5;
public static final String SELECTOR_VALUE = "Test = 'MST'";
@@ -103,8 +103,7 @@ public class MessageStoreTest extends QpidTestCase
private String queueOwner = "MST";
private VirtualHost _virtualHost;
- private org.apache.qpid.server.model.VirtualHost _virtualHostModel;
- private Broker _broker;
+ private org.apache.qpid.server.model.VirtualHost<?> _virtualHostModel;
private String _storePath;
public void setUp() throws Exception
@@ -120,6 +119,7 @@ public class MessageStoreTest extends QpidTestCase
messageStoreSettings.put(MessageStore.STORE_TYPE, getTestProfileMessageStoreType());
_virtualHostModel = mock(org.apache.qpid.server.model.VirtualHost.class);
+
when(_virtualHostModel.getMessageStoreSettings()).thenReturn(messageStoreSettings);
when(_virtualHostModel.getAttribute(eq(org.apache.qpid.server.model.VirtualHost.TYPE))).thenReturn(StandardVirtualHostFactory.TYPE);
when(_virtualHostModel.getAttribute(eq(org.apache.qpid.server.model.VirtualHost.NAME))).thenReturn(hostName);
@@ -128,8 +128,6 @@ public class MessageStoreTest extends QpidTestCase
cleanup(new File(_storePath));
- _broker = BrokerTestHelper.createBrokerMock();
-
reloadVirtualHost();
}
@@ -201,10 +199,6 @@ public class MessageStoreTest extends QpidTestCase
assertTrue("Virtualhost has not changed, reload was not successful", original != getVirtualHost());
}
- /**
- * Old MessageStoreTest segment which runs against both persistent and non-persistent stores
- * creating queues, exchanges and bindings and then verifying message delivery to them.
- */
public void testQueueExchangeAndBindingCreation() throws Exception
{
assertEquals("Should not be any existing queues", 0, getVirtualHost().getQueues().size());
@@ -213,15 +207,15 @@ public class MessageStoreTest extends QpidTestCase
createAllTopicQueues();
//Register Non-Durable DirectExchange
- ExchangeImpl nonDurableExchange = createExchange(DirectExchange.TYPE, nonDurableExchangeName, false);
+ ExchangeImpl<?> nonDurableExchange = createExchange(DirectExchange.TYPE, nonDurableExchangeName, false);
bindAllQueuesToExchange(nonDurableExchange, directRouting);
//Register DirectExchange
- ExchangeImpl directExchange = createExchange(DirectExchange.TYPE, directExchangeName, true);
+ ExchangeImpl<?> directExchange = createExchange(DirectExchange.TYPE, directExchangeName, true);
bindAllQueuesToExchange(directExchange, directRouting);
//Register TopicExchange
- ExchangeImpl topicExchange = createExchange(TopicExchange.TYPE, topicExchangeName, true);
+ ExchangeImpl<?> topicExchange = createExchange(TopicExchange.TYPE, topicExchangeName, true);
bindAllTopicQueuesToExchange(topicExchange, topicRouting);
//Send Message To NonDurable direct Exchange = persistent
@@ -248,12 +242,6 @@ public class MessageStoreTest extends QpidTestCase
10, getVirtualHost().getQueues().size());
}
- /**
- * Tests message persistence by running the testQueueExchangeAndBindingCreation() method above
- * before reloading the virtual host and ensuring that the persistent messages were restored.
- *
- * More specific testing of message persistence is left to store-specific unit testing.
- */
public void testMessagePersistence() throws Exception
{
testQueueExchangeAndBindingCreation();
@@ -346,7 +334,7 @@ public class MessageStoreTest extends QpidTestCase
1, getVirtualHost().getQueues().size());
//test that removing the queue means it is not recovered next time
- final AMQQueue queue = getVirtualHost().getQueue(durableQueueName);
+ final AMQQueue<?> queue = getVirtualHost().getQueue(durableQueueName);
DurableConfigurationStoreHelper.removeQueue(getVirtualHost().getDurableConfigurationStore(),queue);
reloadVirtualHost();
@@ -397,7 +385,7 @@ public class MessageStoreTest extends QpidTestCase
origExchangeCount + 1, getVirtualHost().getExchanges().size());
//test that removing the exchange means it is not recovered next time
- final ExchangeImpl exchange = getVirtualHost().getExchange(directExchangeName);
+ final ExchangeImpl<?> exchange = getVirtualHost().getExchange(directExchangeName);
DurableConfigurationStoreHelper.removeExchange(getVirtualHost().getDurableConfigurationStore(), exchange);
reloadVirtualHost();
@@ -423,9 +411,9 @@ public class MessageStoreTest extends QpidTestCase
Map<String, ExchangeImpl<?>> exchanges = createExchanges();
- ExchangeImpl nonDurableExchange = exchanges.get(nonDurableExchangeName);
- ExchangeImpl directExchange = exchanges.get(directExchangeName);
- ExchangeImpl topicExchange = exchanges.get(topicExchangeName);
+ ExchangeImpl<?> nonDurableExchange = exchanges.get(nonDurableExchangeName);
+ ExchangeImpl<?> directExchange = exchanges.get(directExchangeName);
+ ExchangeImpl<?> topicExchange = exchanges.get(topicExchangeName);
bindAllQueuesToExchange(nonDurableExchange, directRouting);
bindAllQueuesToExchange(directExchange, directRouting);
@@ -449,7 +437,7 @@ public class MessageStoreTest extends QpidTestCase
public void testDurableBindingRemoval() throws Exception
{
//create durable queue and exchange, bind them
- ExchangeImpl exch = createExchange(DirectExchange.TYPE, directExchangeName, true);
+ ExchangeImpl<?> exch = createExchange(DirectExchange.TYPE, directExchangeName, true);
createQueue(durableQueueName, false, true, false, false);
bindQueueToExchange(exch, directRouting, getVirtualHost().getQueue(durableQueueName), false);
@@ -482,7 +470,7 @@ public class MessageStoreTest extends QpidTestCase
private void validateExchanges(int originalNumExchanges, Map<String, ExchangeImpl<?>> oldExchanges)
{
Collection<ExchangeImpl<?>> exchanges = getVirtualHost().getExchanges();
- Collection<String> exchangeNames = new ArrayList(exchanges.size());
+ Collection<String> exchangeNames = new ArrayList<String>(exchanges.size());
for(ExchangeImpl<?> exchange : exchanges)
{
exchangeNames.add(exchange.getName());
@@ -506,6 +494,7 @@ public class MessageStoreTest extends QpidTestCase
}
/** Validates the Durable queues and their properties are as expected following recovery */
+ @SuppressWarnings("unchecked")
private void validateBindingProperties()
{
@@ -526,11 +515,11 @@ public class MessageStoreTest extends QpidTestCase
* @param bindings the set of bindings to validate
* @param useSelectors if set, check the binding has a JMS_SELECTOR argument and the correct value for it
*/
- private void validateBindingProperties(Collection<? extends Binding> bindings, boolean useSelectors)
+ private void validateBindingProperties(Collection<? extends Binding<?>> bindings, boolean useSelectors)
{
assertEquals("Each queue should only be bound once.", 1, bindings.size());
- Binding binding = bindings.iterator().next();
+ Binding<?> binding = bindings.iterator().next();
if (useSelectors)
{
@@ -543,13 +532,13 @@ public class MessageStoreTest extends QpidTestCase
private void setQueueExclusivity(boolean exclusive) throws MessageSource.ExistingConsumerPreventsExclusive
{
- AMQQueue queue = getVirtualHost().getQueue(durableExclusiveQueueName);
+ AMQQueue<?> queue = getVirtualHost().getQueue(durableExclusiveQueueName);
queue.setExclusivityPolicy(exclusive ? ExclusivityPolicy.CONTAINER : ExclusivityPolicy.NONE);
}
private void validateQueueExclusivityProperty(boolean expected)
{
- AMQQueue queue = getVirtualHost().getQueue(durableExclusiveQueueName);
+ AMQQueue<?> queue = getVirtualHost().getQueue(durableExclusiveQueueName);
assertEquals("Queue exclusivity was incorrect", queue.isExclusive(), expected);
}
@@ -565,7 +554,7 @@ public class MessageStoreTest extends QpidTestCase
validateQueueProperties(getVirtualHost().getQueue(durableLastValueQueueName), false, true, true, true);
}
- private void validateQueueProperties(AMQQueue queue, boolean usePriority, boolean durable, boolean exclusive, boolean lastValueQueue)
+ private void validateQueueProperties(AMQQueue<?> queue, boolean usePriority, boolean durable, boolean exclusive, boolean lastValueQueue)
{
if(usePriority || lastValueQueue)
{
@@ -588,9 +577,9 @@ public class MessageStoreTest extends QpidTestCase
assertEquals("Queue is not 'simple'", StandardQueue.class, queue.getClass());
}
- assertEquals("Queue owner is not as expected", exclusive ? queueOwner : null, queue.getOwner());
- assertEquals("Queue durability is not as expected", durable, queue.isDurable());
- assertEquals("Queue exclusivity is not as expected", exclusive, queue.isExclusive());
+ assertEquals("Queue owner is not as expected for queue " + queue.getName(), exclusive ? queueOwner : null, queue.getOwner());
+ assertEquals("Queue durability is not as expected for queue " + queue.getName(), durable, queue.isDurable());
+ assertEquals("Queue exclusivity is not as expected for queue " + queue.getName(), exclusive, queue.isExclusive());
}
/**
@@ -606,7 +595,7 @@ public class MessageStoreTest extends QpidTestCase
}
}
- private void sendMessageOnExchange(ExchangeImpl exchange, String routingKey, boolean deliveryMode)
+ private void sendMessageOnExchange(ExchangeImpl<?> exchange, String routingKey, boolean deliveryMode)
{
//Set MessagePersistence
BasicContentHeaderProperties properties = new BasicContentHeaderProperties();
@@ -698,15 +687,12 @@ public class MessageStoreTest extends QpidTestCase
{
queueArguments.put(Queue.OWNER, queueOwner);
}
- AMQQueue queue = null;
+ AMQQueue<?> queue = null;
//Ideally we would be able to use the QueueDeclareHandler here.
queue = getVirtualHost().createQueue(queueArguments);
validateQueueProperties(queue, usePriority, durable, exclusive, lastValueQueue);
-
-
-
}
private Map<String, ExchangeImpl<?>> createExchanges() throws Exception
@@ -733,14 +719,14 @@ public class MessageStoreTest extends QpidTestCase
attributes.put(org.apache.qpid.server.model.Exchange.TYPE, type.getType());
attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, durable);
attributes.put(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY,
- false ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT);
+ durable ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT);
attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, null);
exchange = getVirtualHost().createExchange(attributes);
return exchange;
}
- private void bindAllQueuesToExchange(ExchangeImpl exchange, String routingKey)
+ private void bindAllQueuesToExchange(ExchangeImpl<?> exchange, String routingKey)
{
bindQueueToExchange(exchange, routingKey, getVirtualHost().getQueue(durablePriorityQueueName), false);
bindQueueToExchange(exchange, routingKey, getVirtualHost().getQueue(durableQueueName), false);
@@ -749,7 +735,7 @@ public class MessageStoreTest extends QpidTestCase
bindQueueToExchange(exchange, routingKey, getVirtualHost().getQueue(durableExclusiveQueueName), false);
}
- private void bindAllTopicQueuesToExchange(ExchangeImpl exchange, String routingKey)
+ private void bindAllTopicQueuesToExchange(ExchangeImpl<?> exchange, String routingKey)
{
bindQueueToExchange(exchange, routingKey, getVirtualHost().getQueue(durablePriorityTopicQueueName), true);
@@ -759,9 +745,9 @@ public class MessageStoreTest extends QpidTestCase
}
- protected void bindQueueToExchange(ExchangeImpl exchange,
+ protected void bindQueueToExchange(ExchangeImpl<?> exchange,
String routingKey,
- AMQQueue queue,
+ AMQQueue<?> queue,
boolean useSelector)
{
Map<String,Object> bindArguments = new HashMap<String, Object>();
@@ -781,9 +767,9 @@ public class MessageStoreTest extends QpidTestCase
}
}
- protected void unbindQueueFromExchange(ExchangeImpl exchange,
+ protected void unbindQueueFromExchange(ExchangeImpl<?> exchange,
String routingKey,
- AMQQueue queue,
+ AMQQueue<?> queue,
boolean useSelector)
{
Map<String,Object> bindArguments = new HashMap<String, Object>();
@@ -829,7 +815,7 @@ public class MessageStoreTest extends QpidTestCase
private void validateMessageOnQueue(String queueName, long messageCount)
{
- AMQQueue queue = getVirtualHost().getQueue(queueName);
+ AMQQueue<?> queue = getVirtualHost().getQueue(queueName);
assertNotNull("Queue(" + queueName + ") not correctly registered:", queue);
@@ -839,12 +825,12 @@ public class MessageStoreTest extends QpidTestCase
private class TestMessagePublishInfo implements MessagePublishInfo
{
- ExchangeImpl _exchange;
+ ExchangeImpl<?> _exchange;
boolean _immediate;
boolean _mandatory;
String _routingKey;
- TestMessagePublishInfo(ExchangeImpl exchange, boolean immediate, boolean mandatory, String routingKey)
+ TestMessagePublishInfo(ExchangeImpl<?> exchange, boolean immediate, boolean mandatory, String routingKey)
{
_exchange = exchange;
_immediate = immediate;
@@ -852,29 +838,35 @@ public class MessageStoreTest extends QpidTestCase
_routingKey = routingKey;
}
+ @Override
public AMQShortString getExchange()
{
return new AMQShortString(_exchange.getName());
}
+ @Override
public void setExchange(AMQShortString exchange)
{
//no-op
}
+ @Override
public boolean isImmediate()
{
return _immediate;
}
+ @Override
public boolean isMandatory()
{
return _mandatory;
}
+ @Override
public AMQShortString getRoutingKey()
{
return new AMQShortString(_routingKey);
}
}
+
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/TestBrokerConfiguration.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/TestBrokerConfiguration.java
index 4c0e2b7ffc..59b4d496fa 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/TestBrokerConfiguration.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/TestBrokerConfiguration.java
@@ -45,10 +45,9 @@ import org.apache.qpid.server.model.SystemContext;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.security.access.FileAccessControlProviderConstants;
import org.apache.qpid.server.security.group.FileGroupManagerFactory;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
import org.apache.qpid.server.store.ConfiguredObjectRecordImpl;
-import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
public class TestBrokerConfiguration
{
@@ -191,7 +190,7 @@ public class TestBrokerConfiguration
private ConfiguredObjectRecord findObject(final Class<? extends ConfiguredObject> category, final String objectName)
{
final RecordFindingVisitor visitor = new RecordFindingVisitor(category, objectName);
- _store.recoverConfigurationStore(visitor);
+ _store.visitConfiguredObjectRecords(visitor);
return visitor.getFoundRecord();
}
@@ -235,11 +234,12 @@ public class TestBrokerConfiguration
return findObject(category, name).getAttributes();
}
- private static class RecordFindingVisitor implements ConfigurationRecoveryHandler
+ private static class RecordFindingVisitor implements ConfiguredObjectRecordHandler
{
private final Class<? extends ConfiguredObject> _category;
private final String _objectName;
public ConfiguredObjectRecord _foundRecord;
+ private int _version;
public RecordFindingVisitor(final Class<? extends ConfiguredObject> category, final String objectName)
{
@@ -248,26 +248,28 @@ public class TestBrokerConfiguration
}
@Override
- public void beginConfigurationRecovery(final DurableConfigurationStore store, final int configVersion)
+ public void begin(final int configVersion)
{
-
+ _version = configVersion;
}
@Override
- public void configuredObject(final ConfiguredObjectRecord object)
+ public boolean handle(final ConfiguredObjectRecord object)
{
if (object.getType().equals(_category.getSimpleName())
&& (_objectName == null
|| _objectName.equals(object.getAttributes().get(ConfiguredObject.NAME))))
{
_foundRecord = object;
+ return false;
}
+ return true;
}
@Override
- public int completeConfigurationRecovery()
+ public int end()
{
- return 0;
+ return _version;
}
public ConfiguredObjectRecord getFoundRecord()