summaryrefslogtreecommitdiff
path: root/qpid/java/broker-core/src/test/java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker-core/src/test/java')
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandlerTest.java81
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java60
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreConfigurationTest.java6
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java57
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java4
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java404
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java21
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java15
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java64
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestRecord.java91
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java149
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStoreFactory.java47
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java18
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java3
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MessageStoreRecovererTest.java414
15 files changed, 1033 insertions, 401 deletions
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandlerTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandlerTest.java
index 0fe9d1ac49..1de857d224 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandlerTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandlerTest.java
@@ -50,10 +50,10 @@ import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.SystemContext;
import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
import org.apache.qpid.server.store.ConfiguredObjectRecordImpl;
import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
import org.apache.qpid.test.utils.QpidTestCase;
public class ManagementModeStoreHandlerTest extends QpidTestCase
@@ -89,20 +89,22 @@ public class ManagementModeStoreHandlerTest extends QpidTestCase
when(_portEntry.getParents()).thenReturn(Collections.singletonMap(Broker.class.getSimpleName(), _root));
when(_portEntry.getType()).thenReturn(Port.class.getSimpleName());
- final ArgumentCaptor<ConfigurationRecoveryHandler> recovererArgumentCaptor = ArgumentCaptor.forClass(ConfigurationRecoveryHandler.class);
+ final ArgumentCaptor<ConfiguredObjectRecordHandler> recovererArgumentCaptor = ArgumentCaptor.forClass(ConfiguredObjectRecordHandler.class);
doAnswer(
new Answer()
{
@Override
public Object answer(final InvocationOnMock invocation) throws Throwable
{
- ConfigurationRecoveryHandler recoverer = recovererArgumentCaptor.getValue();
- recoverer.configuredObject(_root);
- recoverer.configuredObject(_portEntry);
+ ConfiguredObjectRecordHandler recoverer = recovererArgumentCaptor.getValue();
+ if(recoverer.handle(_root))
+ {
+ recoverer.handle(_portEntry);
+ }
return null;
}
}
- ).when(_store).recoverConfigurationStore(recovererArgumentCaptor.capture());
+ ).when(_store).visitConfiguredObjectRecords(recovererArgumentCaptor.capture());
_options = new BrokerOptions();
_handler = new ManagementModeStoreHandler(_store, _options);
@@ -112,21 +114,21 @@ public class ManagementModeStoreHandlerTest extends QpidTestCase
private ConfiguredObjectRecord getRootEntry()
{
BrokerFinder brokerFinder = new BrokerFinder();
- _handler.recoverConfigurationStore(brokerFinder);
+ _handler.visitConfiguredObjectRecords(brokerFinder);
return brokerFinder.getBrokerRecord();
}
private ConfiguredObjectRecord getEntry(UUID id)
{
RecordFinder recordFinder = new RecordFinder(id);
- _handler.recoverConfigurationStore(recordFinder);
+ _handler.visitConfiguredObjectRecords(recordFinder);
return recordFinder.getFoundRecord();
}
private Collection<UUID> getChildrenIds(ConfiguredObjectRecord record)
{
ChildFinder childFinder = new ChildFinder(record);
- _handler.recoverConfigurationStore(childFinder);
+ _handler.visitConfiguredObjectRecords(childFinder);
return childFinder.getChildIds();
}
@@ -288,21 +290,25 @@ public class ManagementModeStoreHandlerTest extends QpidTestCase
attributes.put(VirtualHost.TYPE, "STANDARD");
final ConfiguredObjectRecord virtualHost = new ConfiguredObjectRecordImpl(virtualHostId, VirtualHost.class.getSimpleName(), attributes, Collections.singletonMap(Broker.class.getSimpleName(), _root));
- final ArgumentCaptor<ConfigurationRecoveryHandler> recovererArgumentCaptor = ArgumentCaptor.forClass(ConfigurationRecoveryHandler.class);
+ final ArgumentCaptor<ConfiguredObjectRecordHandler> recovererArgumentCaptor = ArgumentCaptor.forClass(ConfiguredObjectRecordHandler.class);
doAnswer(
new Answer()
{
@Override
public Object answer(final InvocationOnMock invocation) throws Throwable
{
- ConfigurationRecoveryHandler recoverer = recovererArgumentCaptor.getValue();
- recoverer.configuredObject(_root);
- recoverer.configuredObject(_portEntry);
- recoverer.configuredObject(virtualHost);
+ ConfiguredObjectRecordHandler recoverer = recovererArgumentCaptor.getValue();
+ if(recoverer.handle(_root))
+ {
+ if(recoverer.handle(_portEntry))
+ {
+ recoverer.handle(virtualHost);
+ }
+ }
return null;
}
}
- ).when(_store).recoverConfigurationStore(recovererArgumentCaptor.capture());
+ ).when(_store).visitConfiguredObjectRecords(recovererArgumentCaptor.capture());
State expectedState = mmQuiesceVhosts ? State.QUIESCED : null;
if(mmQuiesceVhosts)
@@ -457,28 +463,32 @@ public class ManagementModeStoreHandlerTest extends QpidTestCase
}
- private class BrokerFinder implements ConfigurationRecoveryHandler
+ private class BrokerFinder implements ConfiguredObjectRecordHandler
{
private ConfiguredObjectRecord _brokerRecord;
+ private int _version;
+
@Override
- public void beginConfigurationRecovery(final DurableConfigurationStore store, final int configVersion)
+ public void begin(final int configVersion)
{
-
+ _version = configVersion;
}
@Override
- public void configuredObject(final ConfiguredObjectRecord object)
+ public boolean handle(final ConfiguredObjectRecord object)
{
if(object.getType().equals(Broker.class.getSimpleName()))
{
_brokerRecord = object;
+ return false;
}
+ return true;
}
@Override
- public int completeConfigurationRecovery()
+ public int end()
{
- return 0;
+ return _version;
}
public ConfiguredObjectRecord getBrokerRecord()
@@ -487,10 +497,11 @@ public class ManagementModeStoreHandlerTest extends QpidTestCase
}
}
- private class RecordFinder implements ConfigurationRecoveryHandler
+ private class RecordFinder implements ConfiguredObjectRecordHandler
{
private final UUID _id;
private ConfiguredObjectRecord _foundRecord;
+ private int _version;
private RecordFinder(final UUID id)
{
@@ -498,24 +509,26 @@ public class ManagementModeStoreHandlerTest extends QpidTestCase
}
@Override
- public void beginConfigurationRecovery(final DurableConfigurationStore store, final int configVersion)
+ public void begin(final int configVersion)
{
-
+ _version = configVersion;
}
@Override
- public void configuredObject(final ConfiguredObjectRecord object)
+ public boolean handle(final ConfiguredObjectRecord object)
{
if(object.getId().equals(_id))
{
_foundRecord = object;
+ return false;
}
+ return true;
}
@Override
- public int completeConfigurationRecovery()
+ public int end()
{
- return 0;
+ return _version;
}
public ConfiguredObjectRecord getFoundRecord()
@@ -524,10 +537,11 @@ public class ManagementModeStoreHandlerTest extends QpidTestCase
}
}
- private class ChildFinder implements ConfigurationRecoveryHandler
+ private class ChildFinder implements ConfiguredObjectRecordHandler
{
private final Collection<UUID> _childIds = new HashSet<UUID>();
private final ConfiguredObjectRecord _parent;
+ private int _version;
private ChildFinder(final ConfiguredObjectRecord parent)
{
@@ -535,13 +549,13 @@ public class ManagementModeStoreHandlerTest extends QpidTestCase
}
@Override
- public void beginConfigurationRecovery(final DurableConfigurationStore store, final int configVersion)
+ public void begin(final int configVersion)
{
-
+ _version = configVersion;
}
@Override
- public void configuredObject(final ConfiguredObjectRecord object)
+ public boolean handle(final ConfiguredObjectRecord object)
{
if(object.getParents() != null)
@@ -555,12 +569,13 @@ public class ManagementModeStoreHandlerTest extends QpidTestCase
}
}
+ return true;
}
@Override
- public int completeConfigurationRecovery()
+ public int end()
{
- return 0;
+ return _version;
}
public Collection<UUID> getChildIds()
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
index 83052110a1..b38d9d7bd2 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
@@ -21,9 +21,7 @@
package org.apache.qpid.server.store;
import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.argThat;
-import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
@@ -49,6 +47,7 @@ import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.util.FileUtils;
import org.mockito.ArgumentCaptor;
@@ -71,9 +70,8 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
private String _storePath;
private String _storeName;
- private ConfigurationRecoveryHandler _recoveryHandler;
+ private ConfiguredObjectRecordHandler _handler;
- private ExchangeImpl _exchange = mock(ExchangeImpl.class);
private static final String ROUTING_KEY = "routingKey";
private static final String QUEUE_NAME = "queueName";
private Map<String,Object> _bindingArgs;
@@ -96,16 +94,8 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
FileUtils.delete(new File(_storePath), true);
setTestSystemProperty("QPID_WORK", TMP_FOLDER);
- _recoveryHandler = mock(ConfigurationRecoveryHandler.class);
- when(_exchange.getName()).thenReturn(EXCHANGE_NAME);
- when(_exchange.getId()).thenReturn(_exchangeId);
- when(_exchange.getExchangeType()).thenReturn(mock(ExchangeType.class));
- when(_exchange.getEventLogger()).thenReturn(new EventLogger());
-
- ConfiguredObjectRecord exchangeRecord = mock(ConfiguredObjectRecord.class);
- when(exchangeRecord.getId()).thenReturn(_exchangeId);
- when(exchangeRecord.getType()).thenReturn(Exchange.class.getSimpleName());
- when(_exchange.asObjectRecord()).thenReturn(exchangeRecord);
+ _handler = mock(ConfiguredObjectRecordHandler.class);
+ when(_handler.handle(any(ConfiguredObjectRecord.class))).thenReturn(true);
_bindingArgs = new HashMap<String, Object>();
String argKey = AMQPFilterTypes.JMS_SELECTOR.toString();
@@ -134,7 +124,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
DurableConfigurationStoreHelper.createExchange(_configStore, exchange);
reopenStore();
- verify(_recoveryHandler).configuredObject(matchesRecord(_exchangeId, EXCHANGE,
+ verify(_handler).handle(matchesRecord(_exchangeId, EXCHANGE,
map( org.apache.qpid.server.model.Exchange.NAME, getName(),
org.apache.qpid.server.model.Exchange.TYPE, getName()+"Type",
org.apache.qpid.server.model.Exchange.LIFETIME_POLICY, LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS.name())));
@@ -168,14 +158,16 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
DurableConfigurationStoreHelper.removeExchange(_configStore, exchange);
reopenStore();
- verify(_recoveryHandler, never()).configuredObject(any(ConfiguredObjectRecord.class));
+ verify(_handler, never()).handle(any(ConfiguredObjectRecord.class));
}
public void testBindQueue() throws Exception
{
+ ExchangeImpl<?> exchange = createTestExchange();
AMQQueue queue = createTestQueue(QUEUE_NAME, "queueOwner", false, null);
BindingImpl binding = new BindingImpl(UUIDGenerator.generateRandomUUID(), ROUTING_KEY, queue,
- _exchange, _bindingArgs);
+ exchange, _bindingArgs);
+ DurableConfigurationStoreHelper.createExchange(_configStore, exchange);
DurableConfigurationStoreHelper.createQueue(_configStore, queue);
DurableConfigurationStoreHelper.createBinding(_configStore, binding);
@@ -187,10 +179,10 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
Map<String,UUID> parents = new HashMap<String, UUID>();
- parents.put(Exchange.class.getSimpleName(), _exchange.getId());
+ parents.put(Exchange.class.getSimpleName(), exchange.getId());
parents.put(Queue.class.getSimpleName(), queue.getId());
- verify(_recoveryHandler).configuredObject(matchesRecord(binding.getId(), BINDING, map, parents));
+ verify(_handler).handle(matchesRecord(binding.getId(), BINDING, map, parents));
}
@@ -260,15 +252,18 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
public void testUnbindQueue() throws Exception
{
+ ExchangeImpl<?> exchange = createTestExchange();
+ DurableConfigurationStoreHelper.createExchange(_configStore, exchange);
+
AMQQueue queue = createTestQueue(QUEUE_NAME, "queueOwner", false, null);
BindingImpl binding = new BindingImpl(UUIDGenerator.generateRandomUUID(), ROUTING_KEY, queue,
- _exchange, _bindingArgs);
+ exchange, _bindingArgs);
DurableConfigurationStoreHelper.createBinding(_configStore, binding);
DurableConfigurationStoreHelper.removeBinding(_configStore, binding);
reopenStore();
- verify(_recoveryHandler, never()).configuredObject(matchesRecord(ANY_UUID, BINDING,
+ verify(_handler, never()).handle(matchesRecord(ANY_UUID, BINDING,
ANY_MAP));
}
@@ -282,7 +277,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
queueAttributes.put(Queue.NAME, getName());
queueAttributes.put(Queue.OWNER, getName()+"Owner");
queueAttributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONTAINER.name());
- verify(_recoveryHandler).configuredObject(matchesRecord(_queueId, QUEUE, queueAttributes));
+ verify(_handler).handle(matchesRecord(_queueId, QUEUE, queueAttributes));
}
public void testCreateQueueAMQQueueFieldTable() throws Exception
@@ -304,7 +299,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
queueAttributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONTAINER.name());
queueAttributes.putAll(attributes);
- verify(_recoveryHandler).configuredObject(matchesRecord(_queueId, QUEUE, queueAttributes));
+ verify(_handler).handle(matchesRecord(_queueId, QUEUE, queueAttributes));
}
public void testCreateQueueAMQQueueWithAlternateExchange() throws Exception
@@ -322,7 +317,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
queueAttributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONTAINER.name());
queueAttributes.put(Queue.ALTERNATE_EXCHANGE, alternateExchange.getId().toString());
- verify(_recoveryHandler).configuredObject(matchesRecord(_queueId, QUEUE, queueAttributes));
+ verify(_handler).handle(matchesRecord(_queueId, QUEUE, queueAttributes));
}
private ExchangeImpl createTestAlternateExchange()
@@ -355,7 +350,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
queueAttributes.put(Queue.NAME, getName());
queueAttributes.putAll(attributes);
- verify(_recoveryHandler).configuredObject(matchesRecord(_queueId, QUEUE, queueAttributes));
+ verify(_handler).handle(matchesRecord(_queueId, QUEUE, queueAttributes));
}
@@ -382,7 +377,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
queueAttributes.putAll(attributes);
queueAttributes.put(Queue.ALTERNATE_EXCHANGE, alternateExchange.getId().toString());
- verify(_recoveryHandler).configuredObject(matchesRecord(_queueId, QUEUE, queueAttributes));
+ verify(_handler).handle(matchesRecord(_queueId, QUEUE, queueAttributes));
}
public void testRemoveQueue() throws Exception
@@ -397,7 +392,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
// remove queue
DurableConfigurationStoreHelper.removeQueue(_configStore,queue);
reopenStore();
- verify(_recoveryHandler, never()).configuredObject(any(ConfiguredObjectRecord.class));
+ verify(_handler, never()).handle(any(ConfiguredObjectRecord.class));
}
private AMQQueue createTestQueue(String queueName,
@@ -463,11 +458,9 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
{
ExchangeImpl exchange = mock(ExchangeImpl.class);
Map<String,Object> actualAttributes = new HashMap<String, Object>();
- actualAttributes.put("id", _exchangeId);
actualAttributes.put("name", getName());
actualAttributes.put("type", getName() + "Type");
actualAttributes.put("lifetimePolicy", LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS);
- when(exchange.getActualAttributes()).thenReturn(actualAttributes);
when(exchange.getName()).thenReturn(getName());
when(exchange.getTypeName()).thenReturn(getName() + "Type");
when(exchange.isAutoDelete()).thenReturn(true);
@@ -475,11 +468,10 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
ConfiguredObjectRecord exchangeRecord = mock(ConfiguredObjectRecord.class);
when(exchangeRecord.getId()).thenReturn(_exchangeId);
when(exchangeRecord.getType()).thenReturn(Exchange.class.getSimpleName());
- Map<String,Object> actualAttributesExceptId = new HashMap<String, Object>(actualAttributes);
- actualAttributesExceptId.remove("id");
- when(exchangeRecord.getAttributes()).thenReturn(actualAttributesExceptId);
+ when(exchangeRecord.getAttributes()).thenReturn(actualAttributes);
when(exchange.asObjectRecord()).thenReturn(exchangeRecord);
-
+ when(exchange.getExchangeType()).thenReturn(mock(ExchangeType.class));
+ when(exchange.getEventLogger()).thenReturn(new EventLogger());
return exchange;
}
@@ -491,7 +483,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
ConfiguredObject<?> parent = mock(ConfiguredObject.class);
when(parent.getName()).thenReturn("testName");
_configStore.openConfigurationStore(parent, _configurationStoreSettings);
- _configStore.recoverConfigurationStore(_recoveryHandler);
+ _configStore.visitConfiguredObjectRecords(_handler);
}
protected abstract DurableConfigurationStore createConfigStore() throws Exception;
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreConfigurationTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreConfigurationTest.java
index 8f2d0029f6..2400a68c93 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreConfigurationTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreConfigurationTest.java
@@ -27,10 +27,4 @@ public class JsonFileConfigStoreConfigurationTest extends AbstractDurableConfigu
{
return new JsonFileConfigStore();
}
-
- @Override
- public void testBindQueue() throws Exception
- {
- // TODO: Temporarily disable the test as it is already fixed on trunk
- }
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java
index 1de24e371d..6907898a6c 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java
@@ -28,6 +28,7 @@ import java.util.UUID;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.test.utils.TestFileUtils;
@@ -43,15 +44,15 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.times;
public class JsonFileConfigStoreTest extends QpidTestCase
{
- private final ConfigurationRecoveryHandler _recoveryHandler = mock(ConfigurationRecoveryHandler.class);
-
private JsonFileConfigStore _store;
private HashMap<String, Object> _configurationStoreSettings;
private ConfiguredObject<?> _virtualHost;
private File _storeLocation;
+ private ConfiguredObjectRecordHandler _handler;
private static final UUID ANY_UUID = UUID.randomUUID();
@@ -69,6 +70,9 @@ public class JsonFileConfigStoreTest extends QpidTestCase
_configurationStoreSettings.put(JsonFileConfigStore.STORE_TYPE, JsonFileConfigStore.TYPE);
_configurationStoreSettings.put(JsonFileConfigStore.STORE_PATH, _storeLocation.getAbsolutePath());
_store = new JsonFileConfigStore();
+
+ _handler = mock(ConfiguredObjectRecordHandler.class);
+ when(_handler.handle(any(ConfiguredObjectRecord.class))).thenReturn(true);
}
@Override
@@ -113,35 +117,35 @@ public class JsonFileConfigStoreTest extends QpidTestCase
}
}
- public void testStartFromNoStore() throws Exception
+ public void testVisitEmptyStore()
{
_store.openConfigurationStore(_virtualHost, _configurationStoreSettings);
- _store.recoverConfigurationStore(_recoveryHandler);
- InOrder inorder = inOrder(_recoveryHandler);
- inorder.verify(_recoveryHandler).beginConfigurationRecovery(eq(_store), eq(0));
- inorder.verify(_recoveryHandler,never()).configuredObject(any(ConfiguredObjectRecord.class));
- inorder.verify(_recoveryHandler).completeConfigurationRecovery();
+ _store.visitConfiguredObjectRecords(_handler);
+ InOrder inorder = inOrder(_handler);
+ inorder.verify(_handler).begin(eq(0));
+ inorder.verify(_handler,never()).handle(any(ConfiguredObjectRecord.class));
+ inorder.verify(_handler).end();
_store.closeConfigurationStore();
}
public void testUpdatedConfigVersionIsRetained() throws Exception
{
final int NEW_CONFIG_VERSION = 42;
- when(_recoveryHandler.completeConfigurationRecovery()).thenReturn(NEW_CONFIG_VERSION);
+ when(_handler.end()).thenReturn(NEW_CONFIG_VERSION);
_store.openConfigurationStore(_virtualHost, _configurationStoreSettings);
- _store.recoverConfigurationStore(_recoveryHandler);
+ _store.visitConfiguredObjectRecords(_handler);
_store.closeConfigurationStore();
_store.openConfigurationStore(_virtualHost, _configurationStoreSettings);
- _store.recoverConfigurationStore(_recoveryHandler);
- InOrder inorder = inOrder(_recoveryHandler);
+ _store.visitConfiguredObjectRecords(_handler);
+ InOrder inorder = inOrder(_handler);
// first time the config version should be the initial version - 0
- inorder.verify(_recoveryHandler).beginConfigurationRecovery(eq(_store), eq(0));
+ inorder.verify(_handler).begin(eq(0));
// second time the config version should be the updated version
- inorder.verify(_recoveryHandler).beginConfigurationRecovery(eq(_store), eq(NEW_CONFIG_VERSION));
+ inorder.verify(_handler).begin(eq(NEW_CONFIG_VERSION));
_store.closeConfigurationStore();
}
@@ -157,8 +161,9 @@ public class JsonFileConfigStoreTest extends QpidTestCase
_store.closeConfigurationStore();
_store.openConfigurationStore(_virtualHost, _configurationStoreSettings);
- _store.recoverConfigurationStore(_recoveryHandler);
- verify(_recoveryHandler).configuredObject(matchesRecord(queueId, queueType, queueAttr));
+
+ _store.visitConfiguredObjectRecords(_handler);
+ verify(_handler, times(1)).handle(matchesRecord(queueId, queueType, queueAttr));
_store.closeConfigurationStore();
}
@@ -179,8 +184,8 @@ public class JsonFileConfigStoreTest extends QpidTestCase
_store.closeConfigurationStore();
_store.openConfigurationStore(_virtualHost, _configurationStoreSettings);
- _store.recoverConfigurationStore(_recoveryHandler);
- verify(_recoveryHandler).configuredObject(matchesRecord(queueId, queueType, queueAttr));
+ _store.visitConfiguredObjectRecords(_handler);
+ verify(_handler, times(1)).handle(matchesRecord(queueId, queueType, queueAttr));
_store.closeConfigurationStore();
}
@@ -201,8 +206,8 @@ public class JsonFileConfigStoreTest extends QpidTestCase
_store.closeConfigurationStore();
_store.openConfigurationStore(_virtualHost, _configurationStoreSettings);
- _store.recoverConfigurationStore(_recoveryHandler);
- verify(_recoveryHandler, never()).configuredObject(any(ConfiguredObjectRecord.class));
+ _store.visitConfiguredObjectRecords(_handler);
+ verify(_handler, never()).handle(any(ConfiguredObjectRecord.class));
_store.closeConfigurationStore();
}
@@ -311,12 +316,12 @@ public class JsonFileConfigStoreTest extends QpidTestCase
_store.update(true, bindingRecord, binding2Record);
_store.closeConfigurationStore();
_store.openConfigurationStore(_virtualHost, _configurationStoreSettings);
- _store.recoverConfigurationStore(_recoveryHandler);
- verify(_recoveryHandler).configuredObject(matchesRecord(queueId, "Queue", EMPTY_ATTR));
- verify(_recoveryHandler).configuredObject(matchesRecord(queue2Id, "Queue", EMPTY_ATTR));
- verify(_recoveryHandler).configuredObject(matchesRecord(exchangeId, "Exchange", EMPTY_ATTR));
- verify(_recoveryHandler).configuredObject(matchesRecord(bindingId, "Binding", EMPTY_ATTR));
- verify(_recoveryHandler).configuredObject(matchesRecord(binding2Id, "Binding", EMPTY_ATTR));
+ _store.visitConfiguredObjectRecords(_handler);
+ verify(_handler).handle(matchesRecord(queueId, "Queue", EMPTY_ATTR));
+ verify(_handler).handle(matchesRecord(queue2Id, "Queue", EMPTY_ATTR));
+ verify(_handler).handle(matchesRecord(exchangeId, "Exchange", EMPTY_ATTR));
+ verify(_handler).handle(matchesRecord(bindingId, "Binding", EMPTY_ATTR));
+ verify(_handler).handle(matchesRecord(binding2Id, "Binding", EMPTY_ATTR));
_store.closeConfigurationStore();
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java
index 451a2744c3..89fef15e7e 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java
@@ -33,7 +33,6 @@ import java.util.UUID;
import org.apache.log4j.Logger;
import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.model.ConfiguredObject;
-import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.util.FileUtils;
@@ -66,12 +65,9 @@ public abstract class MessageStoreQuotaEventsTestBase extends QpidTestCase imple
_store = createStore();
- MessageStoreRecoveryHandler recoveryHandler = mock(MessageStoreRecoveryHandler.class);
- when(recoveryHandler.begin()).thenReturn(mock(StoredMessageRecoveryHandler.class));
ConfiguredObject<?> parent = mock(ConfiguredObject.class);
when(parent.getName()).thenReturn("test");
_store.openMessageStore(parent, storeSettings);
- _store.recoverMessageStore(recoveryHandler, null);
_transactionResource = UUID.randomUUID();
_events = new ArrayList<Event>();
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
index 51d3fc15d2..8bf981bd7b 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
@@ -21,29 +21,31 @@
package org.apache.qpid.server.store;
import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.UUIDGenerator;
-import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler;
import org.apache.qpid.server.store.Transaction.Record;
+import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
+import org.apache.qpid.server.store.handler.MessageHandler;
+import org.apache.qpid.server.store.handler.MessageInstanceHandler;
import org.apache.qpid.test.utils.QpidTestCase;
+import org.mockito.ArgumentMatcher;
public abstract class MessageStoreTestCase extends QpidTestCase
{
- private MessageStoreRecoveryHandler _messageStoreRecoveryHandler;
- private StoredMessageRecoveryHandler _storedMessageRecoveryHandler;
- private TransactionLogRecoveryHandler _logRecoveryHandler;
- private TransactionLogRecoveryHandler.QueueEntryRecoveryHandler _queueEntryRecoveryHandler;
- private TransactionLogRecoveryHandler.DtxRecordRecoveryHandler _dtxRecordRecoveryHandler;
-
private MessageStore _store;
private Map<String, Object> _storeSettings;
private ConfiguredObject<?> _parent;
@@ -55,35 +57,34 @@ public abstract class MessageStoreTestCase extends QpidTestCase
_parent = mock(ConfiguredObject.class);
when(_parent.getName()).thenReturn("test");
- _storedMessageRecoveryHandler = mock(StoredMessageRecoveryHandler.class);
- _logRecoveryHandler = mock(TransactionLogRecoveryHandler.class);
- _messageStoreRecoveryHandler = mock(MessageStoreRecoveryHandler.class);
- _queueEntryRecoveryHandler = mock(TransactionLogRecoveryHandler.QueueEntryRecoveryHandler.class);
- _dtxRecordRecoveryHandler = mock(TransactionLogRecoveryHandler.DtxRecordRecoveryHandler.class);
-
- when(_messageStoreRecoveryHandler.begin()).thenReturn(_storedMessageRecoveryHandler);
- when(_logRecoveryHandler.begin(any(MessageStore.class))).thenReturn(_queueEntryRecoveryHandler);
- when(_queueEntryRecoveryHandler.completeQueueEntryRecovery()).thenReturn(_dtxRecordRecoveryHandler);
-
_storeSettings = getStoreSettings();
_store = createMessageStore();
_store.openMessageStore(_parent, _storeSettings);
- _store.recoverMessageStore(_messageStoreRecoveryHandler, _logRecoveryHandler);
+
}
protected abstract Map<String, Object> getStoreSettings() throws Exception;
protected abstract MessageStore createMessageStore();
- public MessageStore getStore()
+ protected MessageStore getStore()
{
return _store;
}
- public void testRecordXid() throws Exception
+ protected void reopenStore() throws Exception
{
+ _store.closeMessageStore();
+
+ _store = createMessageStore();
+ _store.openMessageStore(_parent, _storeSettings);
+ }
+
+ public void testAddAndRemoveRecordXid() throws Exception
+ {
+ long format = 1l;
Record enqueueRecord = getTestRecord(1);
Record dequeueRecord = getTestRecord(2);
Record[] enqueues = { enqueueRecord };
@@ -92,27 +93,287 @@ public abstract class MessageStoreTestCase extends QpidTestCase
byte[] branchId = new byte[] { 2 };
Transaction transaction = _store.newTransaction();
- transaction.recordXid(1l, globalId, branchId, enqueues, dequeues);
+ transaction.recordXid(format, globalId, branchId, enqueues, dequeues);
transaction.commitTran();
+
reopenStore();
- verify(_dtxRecordRecoveryHandler).dtxRecord(1l, globalId, branchId, enqueues, dequeues);
+
+ DistributedTransactionHandler handler = mock(DistributedTransactionHandler.class);
+ _store.visitDistributedTransactions(handler);
+ verify(handler, times(1)).handle(format,globalId, branchId, enqueues, dequeues);
transaction = _store.newTransaction();
transaction.removeXid(1l, globalId, branchId);
transaction.commitTran();
reopenStore();
- verify(_dtxRecordRecoveryHandler, times(1)).dtxRecord(1l, globalId, branchId, enqueues, dequeues);
+
+ handler = mock(DistributedTransactionHandler.class);
+ _store.visitDistributedTransactions(handler);
+ verify(handler, never()).handle(format,globalId, branchId, enqueues, dequeues);
}
- private void reopenStore() throws Exception
+ public void testVisitMessages() throws Exception
{
- _store.closeMessageStore();
+ long messageId = 1;
+ int contentSize = 0;
+ final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(messageId, contentSize));
+ StoreFuture flushFuture = message.flushToStore();
+ flushFuture.waitForCompletion();
- _store = createMessageStore();
- _store.openMessageStore(_parent, _storeSettings);
- _store.recoverMessageStore(_messageStoreRecoveryHandler, _logRecoveryHandler);
+ MessageHandler handler = mock(MessageHandler.class);
+ _store.visitMessages(handler);
+
+ verify(handler, times(1)).handle(argThat(new MessageMetaDataMatcher(messageId)));
+
+ }
+
+ public void testVisitMessagesAborted() throws Exception
+ {
+ int contentSize = 0;
+ for (int i = 0; i < 3; i++)
+ {
+ final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(i + 1, contentSize));
+ StoreFuture flushFuture = message.flushToStore();
+ flushFuture.waitForCompletion();
+ }
+
+ MessageHandler handler = mock(MessageHandler.class);
+ when(handler.handle(any(StoredMessage.class))).thenReturn(true, false);
+
+ _store.visitMessages(handler);
+
+ verify(handler, times(2)).handle(any(StoredMessage.class));
+ }
+
+ public void testReopenedMessageStoreUsesLastMessageId() throws Exception
+ {
+ int contentSize = 0;
+ for (int i = 0; i < 3; i++)
+ {
+ final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(i + 1, contentSize));
+ StoreFuture flushFuture = message.flushToStore();
+ flushFuture.waitForCompletion();
+ }
+
+ reopenStore();
+
+ final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(4, contentSize));
+
+ StoreFuture flushFuture = message.flushToStore();
+ flushFuture.waitForCompletion();
+
+ assertEquals("Unexpected message id", 4, message.getMessageNumber());
+ }
+
+ public void testVisitMessageInstances() throws Exception
+ {
+ long messageId = 1;
+ int contentSize = 0;
+ final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(messageId, contentSize));
+ StoreFuture flushFuture = message.flushToStore();
+ flushFuture.waitForCompletion();
+
+ EnqueueableMessage enqueueableMessage = createMockEnqueueableMessage(messageId, message);
+
+ UUID queueId = UUID.randomUUID();
+ TransactionLogResource queue = createTransactionLogResource(queueId);
+
+ Transaction transaction = _store.newTransaction();
+ transaction.enqueueMessage(queue, enqueueableMessage);
+ transaction.commitTran();
+
+ MessageInstanceHandler handler = mock(MessageInstanceHandler.class);
+ _store.visitMessageInstances(handler);
+
+ verify(handler, times(1)).handle(queueId, messageId);
+ }
+
+ public void testVisitDistributedTransactions() throws Exception
+ {
+ long format = 1l;
+ byte[] branchId = new byte[] { 2 };
+ byte[] globalId = new byte[] { 1 };
+ Record enqueueRecord = getTestRecord(1);
+ Record dequeueRecord = getTestRecord(2);
+ Record[] enqueues = { enqueueRecord };
+ Record[] dequeues = { dequeueRecord };
+
+ Transaction transaction = _store.newTransaction();
+ transaction.recordXid(format, globalId, branchId, enqueues, dequeues);
+ transaction.commitTran();
+
+ DistributedTransactionHandler handler = mock(DistributedTransactionHandler.class);
+ _store.visitDistributedTransactions(handler);
+
+ verify(handler, times(1)).handle(format,globalId, branchId, enqueues, dequeues);
+
+ }
+
+ public void testCommitTransaction() throws Exception
+ {
+ final UUID mockQueueId = UUIDGenerator.generateRandomUUID();
+ TransactionLogResource mockQueue = createTransactionLogResource(mockQueueId);
+
+ Transaction txn = getStore().newTransaction();
+
+ long messageId1 = 1L;
+ long messageId2 = 5L;
+ final EnqueueableMessage enqueueableMessage1 = createEnqueueableMessage(messageId1);
+ final EnqueueableMessage enqueueableMessage2 = createEnqueueableMessage(messageId2);
+
+ txn.enqueueMessage(mockQueue, enqueueableMessage1);
+ txn.enqueueMessage(mockQueue, enqueueableMessage2);
+ txn.commitTran();
+
+ QueueFilteringMessageInstanceHandler filter = new QueueFilteringMessageInstanceHandler(mockQueueId);
+ getStore().visitMessageInstances(filter);
+ Set<Long> enqueuedIds = filter.getEnqueuedIds();
+
+ assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size());
+ assertTrue("Message with id " + messageId1 + " is not found", enqueuedIds.contains(messageId1));
+ assertTrue("Message with id " + messageId2 + " is not found", enqueuedIds.contains(messageId2));
}
+
+ public void testRollbackTransactionBeforeCommit() throws Exception
+ {
+ final UUID mockQueueId = UUIDGenerator.generateRandomUUID();
+ TransactionLogResource mockQueue = createTransactionLogResource(mockQueueId);
+
+ long messageId1 = 21L;
+ long messageId2 = 22L;
+ long messageId3 = 23L;
+ final EnqueueableMessage enqueueableMessage1 = createEnqueueableMessage(messageId1);
+ final EnqueueableMessage enqueueableMessage2 = createEnqueueableMessage(messageId2);
+ final EnqueueableMessage enqueueableMessage3 = createEnqueueableMessage(messageId3);
+
+ Transaction txn = getStore().newTransaction();
+
+ txn.enqueueMessage(mockQueue, enqueueableMessage1);
+ txn.abortTran();
+
+ txn = getStore().newTransaction();
+ txn.enqueueMessage(mockQueue, enqueueableMessage2);
+ txn.enqueueMessage(mockQueue, enqueueableMessage3);
+ txn.commitTran();
+
+ QueueFilteringMessageInstanceHandler filter = new QueueFilteringMessageInstanceHandler(mockQueueId);
+ getStore().visitMessageInstances(filter);
+ Set<Long> enqueuedIds = filter.getEnqueuedIds();
+
+ assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size());
+ assertTrue("Message with id " + messageId2 + " is not found", enqueuedIds.contains(messageId2));
+ assertTrue("Message with id " + messageId3 + " is not found", enqueuedIds.contains(messageId3));
+ }
+
+ public void testRollbackTransactionAfterCommit() throws Exception
+ {
+ final UUID mockQueueId = UUIDGenerator.generateRandomUUID();
+ TransactionLogResource mockQueue = createTransactionLogResource(mockQueueId);
+
+ long messageId1 = 30L;
+ long messageId2 = 31L;
+ long messageId3 = 32L;
+
+ final EnqueueableMessage enqueueableMessage1 = createEnqueueableMessage(messageId1);
+ final EnqueueableMessage enqueueableMessage2 = createEnqueueableMessage(messageId2);
+ final EnqueueableMessage enqueueableMessage3 = createEnqueueableMessage(messageId3);
+
+ Transaction txn = getStore().newTransaction();
+
+ txn.enqueueMessage(mockQueue, enqueueableMessage1);
+ txn.commitTran();
+
+ txn = getStore().newTransaction();
+ txn.enqueueMessage(mockQueue, enqueueableMessage2);
+ txn.abortTran();
+
+ txn = getStore().newTransaction();
+ txn.enqueueMessage(mockQueue, enqueueableMessage3);
+ txn.commitTran();
+
+ QueueFilteringMessageInstanceHandler filter = new QueueFilteringMessageInstanceHandler(mockQueueId);
+ getStore().visitMessageInstances(filter);
+ Set<Long> enqueuedIds = filter.getEnqueuedIds();
+
+ assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size());
+ assertTrue("Message with id " + messageId1 + " is not found", enqueuedIds.contains(messageId1));
+ assertTrue("Message with id " + messageId3 + " is not found", enqueuedIds.contains(messageId3));
+ }
+
+ public void testStoreIgnoresTransientMessage() throws Exception
+ {
+ long messageId = 1;
+ int contentSize = 0;
+ final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(messageId, contentSize, false));
+ StoreFuture flushFuture = message.flushToStore();
+ flushFuture.waitForCompletion();
+
+ MessageHandler handler = mock(MessageHandler.class);
+ _store.visitMessages(handler);
+
+ verify(handler, times(0)).handle(argThat(new MessageMetaDataMatcher(messageId)));
+ }
+
+ public void testAddAndRemoveMessageWithoutContent() throws Exception
+ {
+ long messageId = 1;
+ int contentSize = 0;
+ final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(messageId, contentSize));
+ StoreFuture flushFuture = message.flushToStore();
+ flushFuture.waitForCompletion();
+
+ final AtomicReference<StoredMessage<?>> retrievedMessageRef = new AtomicReference<StoredMessage<?>>();
+ _store.visitMessages(new MessageHandler()
+ {
+
+ @Override
+ public boolean handle(StoredMessage<?> storedMessage)
+ {
+ retrievedMessageRef.set(storedMessage);
+ return true;
+ }
+ });
+
+ StoredMessage<?> retrievedMessage = retrievedMessageRef.get();
+ assertNotNull("Message was not found", retrievedMessageRef);
+ assertEquals("Unexpected retreived message", message.getMessageNumber(), retrievedMessage.getMessageNumber());
+
+ retrievedMessage.remove();
+
+ retrievedMessageRef.set(null);
+ _store.visitMessages(new MessageHandler()
+ {
+
+ @Override
+ public boolean handle(StoredMessage<?> storedMessage)
+ {
+ retrievedMessageRef.set(storedMessage);
+ return true;
+ }
+ });
+ assertNull(retrievedMessageRef.get());
+ }
+
+
+ private TransactionLogResource createTransactionLogResource(UUID queueId)
+ {
+ TransactionLogResource queue = mock(TransactionLogResource.class);
+ when(queue.getId()).thenReturn(queueId);
+ when(queue.getName()).thenReturn("testQueue");
+ when(queue.isDurable()).thenReturn(true);
+ return queue;
+ }
+
+ private EnqueueableMessage createMockEnqueueableMessage(long messageId, final StoredMessage<TestMessageMetaData> message)
+ {
+ EnqueueableMessage enqueueableMessage = mock(EnqueueableMessage.class);
+ when(enqueueableMessage.isPersistent()).thenReturn(true);
+ when(enqueueableMessage.getMessageNumber()).thenReturn(messageId);
+ when(enqueueableMessage.getStoredMessage()).thenReturn(message);
+ return enqueueableMessage;
+ }
+
private Record getTestRecord(long messageNumber)
{
UUID queueId1 = UUIDGenerator.generateRandomUUID();
@@ -121,77 +382,66 @@ public abstract class MessageStoreTestCase extends QpidTestCase
EnqueueableMessage message1 = mock(EnqueueableMessage.class);
when(message1.isPersistent()).thenReturn(true);
when(message1.getMessageNumber()).thenReturn(messageNumber);
- final StoredMessage storedMessage = mock(StoredMessage.class);
+ final StoredMessage<?> storedMessage = mock(StoredMessage.class);
when(storedMessage.getMessageNumber()).thenReturn(messageNumber);
when(message1.getStoredMessage()).thenReturn(storedMessage);
Record enqueueRecord = new TestRecord(queue1, message1);
return enqueueRecord;
}
- private static class TestRecord implements Record
+ private EnqueueableMessage createEnqueueableMessage(long messageId1)
{
- private TransactionLogResource _queue;
- private EnqueueableMessage _message;
+ final StoredMessage<TestMessageMetaData> message1 = _store.addMessage(new TestMessageMetaData(messageId1, 0));
+ StoreFuture flushFuture = message1.flushToStore();
+ flushFuture.waitForCompletion();
+ EnqueueableMessage enqueueableMessage1 = createMockEnqueueableMessage(messageId1, message1);
+ return enqueueableMessage1;
+ }
- public TestRecord(TransactionLogResource queue, EnqueueableMessage message)
+ private class MessageMetaDataMatcher extends ArgumentMatcher<StoredMessage<?>>
+ {
+ private long _messageNumber;
+
+ public MessageMetaDataMatcher(long messageNumber)
{
super();
- _queue = queue;
- _message = message;
+ _messageNumber = messageNumber;
}
- @Override
- public TransactionLogResource getResource()
+ public boolean matches(Object obj)
{
- return _queue;
+ return obj instanceof StoredMessage && ((StoredMessage<?>)obj).getMessageNumber() == _messageNumber;
}
+ }
- @Override
- public EnqueueableMessage getMessage()
- {
- return _message;
- }
+ private class QueueFilteringMessageInstanceHandler implements MessageInstanceHandler
+ {
+ private final UUID _queueId;
+ private final Set<Long> _enqueuedIds = new HashSet<Long>();
- @Override
- public int hashCode()
+ public QueueFilteringMessageInstanceHandler(UUID queueId)
{
- final int prime = 31;
- int result = 1;
- result = prime * result + ((_message == null) ? 0 : new Long(_message.getMessageNumber()).hashCode());
- result = prime * result + ((_queue == null) ? 0 : _queue.getId().hashCode());
- return result;
+ _queueId = queueId;
}
@Override
- public boolean equals(Object obj)
+ public boolean handle(UUID queueId, long messageId)
{
- if (this == obj)
- {
- return true;
- }
- if (obj == null)
- {
- return false;
- }
- if (!(obj instanceof Record))
+ if (queueId.equals(_queueId))
{
- return false;
+ if (_enqueuedIds.contains(messageId))
+ {
+ fail("Queue with id " + _queueId + " contains duplicate message ids");
+ }
+ _enqueuedIds.add(messageId);
}
- Record other = (Record) obj;
- if (_message == null && other.getMessage() != null)
- {
- return false;
- }
- if (_queue == null && other.getResource() != null)
- {
- return false;
- }
- if (_message.getMessageNumber() != other.getMessage().getMessageNumber())
- {
- return false;
- }
- return _queue.getId().equals(other.getResource().getId());
+ return true;
}
+ public Set<Long> getEnqueuedIds()
+ {
+ return _enqueuedIds;
+ }
}
+
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java
index 32df355c07..bfa4e1d52e 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java
@@ -20,15 +20,30 @@
*/
package org.apache.qpid.server.store;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.qpid.server.store.handler.MessageHandler;
+
/** A simple message store that stores the messages in a thread-safe structure in memory. */
public class TestMemoryMessageStore extends AbstractMemoryMessageStore
{
public static final String TYPE = "TestMemory";
- @Override
- public String getStoreType()
+ public int getMessageCount()
{
- return TYPE;
+ final AtomicInteger counter = new AtomicInteger();
+ visitMessages(new MessageHandler()
+ {
+
+ @Override
+ public boolean handle(StoredMessage<?> storedMessage)
+ {
+ counter.incrementAndGet();
+ return true;
+ }
+ });
+ return counter.get();
}
+
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java
index e14b41b221..6e55b468a6 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java
@@ -34,13 +34,20 @@ public class TestMessageMetaData implements StorableMessageMetaData
private static final TestMessageMetaDataType TYPE = new TestMessageMetaDataType();
- private int _contentSize;
- private long _messageId;
+ private final int _contentSize;
+ private final long _messageId;
+ private final boolean _persistent;
public TestMessageMetaData(long messageId, int contentSize)
{
+ this(messageId, contentSize, true);
+ }
+
+ public TestMessageMetaData(long messageId, int contentSize, boolean persistent)
+ {
_contentSize = contentSize;
_messageId = messageId;
+ _persistent = persistent;
}
@Override
@@ -59,7 +66,7 @@ public class TestMessageMetaData implements StorableMessageMetaData
}
@Override
- public MessageMetaDataType getType()
+ public MessageMetaDataType<TestMessageMetaData> getType()
{
return TYPE;
}
@@ -67,7 +74,7 @@ public class TestMessageMetaData implements StorableMessageMetaData
@Override
public boolean isPersistent()
{
- return true;
+ return _persistent;
}
@Override
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
index 5622383f3f..e5c94cf66b 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
@@ -70,7 +70,22 @@ public class TestMessageMetaDataType implements MessageMetaDataType<TestMessageM
private static class TestServerMessage implements ServerMessage<TestMessageMetaData>
{
- private StoredMessage<TestMessageMetaData> _storedMsg;
+ private final StoredMessage<TestMessageMetaData> _storedMsg;
+
+ private final MessageReference<ServerMessage> _messageReference = new MessageReference<ServerMessage>()
+ {
+
+ @Override
+ public ServerMessage getMessage()
+ {
+ return TestServerMessage.this;
+ }
+
+ @Override
+ public void release()
+ {
+ }
+ };
public TestServerMessage(StoredMessage<TestMessageMetaData> storedMsg)
{
@@ -115,7 +130,7 @@ public class TestMessageMetaDataType implements MessageMetaDataType<TestMessageM
@Override
public long getMessageNumber()
{
- return 0;
+ return _storedMsg.getMessageNumber();
}
@Override
@@ -140,13 +155,54 @@ public class TestMessageMetaDataType implements MessageMetaDataType<TestMessageM
@Override
public boolean isPersistent()
{
- return false;
+ return _storedMsg.getMetaData().isPersistent();
}
@Override
public MessageReference newReference()
{
- return null;
+ return _messageReference;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((_storedMsg == null) ? 0 : _storedMsg.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (this == obj)
+ {
+ return true;
+ }
+ if (obj == null)
+ {
+ return false;
+ }
+ if (getClass() != obj.getClass())
+ {
+ return false;
+ }
+ TestServerMessage other = (TestServerMessage) obj;
+ if (_storedMsg == null)
+ {
+ if (other._storedMsg != null)
+ {
+ return false;
+ }
+ }
+ else if (!_storedMsg.equals(other._storedMsg))
+ {
+ return false;
+ }
+ return true;
}
+
+
}
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestRecord.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestRecord.java
new file mode 100644
index 0000000000..668d9d5242
--- /dev/null
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestRecord.java
@@ -0,0 +1,91 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import org.apache.qpid.server.message.EnqueueableMessage;
+import org.apache.qpid.server.store.Transaction.Record;
+
+public class TestRecord implements Record
+{
+ private TransactionLogResource _queue;
+ private EnqueueableMessage _message;
+
+ public TestRecord(TransactionLogResource queue, EnqueueableMessage message)
+ {
+ super();
+ _queue = queue;
+ _message = message;
+ }
+
+ @Override
+ public TransactionLogResource getResource()
+ {
+ return _queue;
+ }
+
+ @Override
+ public EnqueueableMessage getMessage()
+ {
+ return _message;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((_message == null) ? 0 : new Long(_message.getMessageNumber()).hashCode());
+ result = prime * result + ((_queue == null) ? 0 : _queue.getId().hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (this == obj)
+ {
+ return true;
+ }
+ if (obj == null)
+ {
+ return false;
+ }
+ if (!(obj instanceof Record))
+ {
+ return false;
+ }
+ Record other = (Record) obj;
+ if (_message == null && other.getMessage() != null)
+ {
+ return false;
+ }
+ if (_queue == null && other.getResource() != null)
+ {
+ return false;
+ }
+ if (_message.getMessageNumber() != other.getMessage().getMessageNumber())
+ {
+ return false;
+ }
+ return _queue.getId().equals(other.getResource().getId());
+ }
+
+} \ No newline at end of file
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
deleted file mode 100644
index 7d4dcd0280..0000000000
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store;
-
-import org.apache.qpid.server.message.EnqueueableMessage;
-import org.apache.qpid.server.queue.AMQQueue;
-
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Adds some extra methods to the memory message store for testing purposes.
- */
-public class TestableMemoryMessageStore extends TestMemoryMessageStore
-{
- public static final String TYPE = "TestableMemory";
- private final Map<Long, AMQQueue> _messages = new HashMap<Long, AMQQueue>();
- private final AtomicInteger _messageCount = new AtomicInteger(0);
-
- @Override
- public StoredMessage addMessage(StorableMessageMetaData metaData)
- {
- return new TestableStoredMessage(super.addMessage(metaData));
- }
-
- public int getMessageCount()
- {
- return _messageCount.get();
- }
-
- public Map<Long, AMQQueue> getMessages()
- {
- return _messages;
- }
-
- private class TestableTransaction implements Transaction
- {
- @Override
- public void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message)
- {
- getMessages().put(message.getMessageNumber(), (AMQQueue)queue);
- }
-
- @Override
- public void dequeueMessage(TransactionLogResource queue, EnqueueableMessage message)
- {
- getMessages().remove(message.getMessageNumber());
- }
-
- @Override
- public void commitTran()
- {
- }
-
- @Override
- public StoreFuture commitTranAsync()
- {
- return StoreFuture.IMMEDIATE_FUTURE;
- }
-
- public void abortTran()
- {
- }
-
- public void removeXid(long format, byte[] globalId, byte[] branchId)
- {
- }
-
- public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues)
- {
- }
- }
-
-
- @Override
- public Transaction newTransaction()
- {
- return new TestableTransaction();
- }
-
-
- private class TestableStoredMessage implements StoredMessage
- {
- private final StoredMessage _storedMessage;
-
- public TestableStoredMessage(StoredMessage storedMessage)
- {
- _messageCount.incrementAndGet();
- _storedMessage = storedMessage;
- }
-
- public StorableMessageMetaData getMetaData()
- {
- return _storedMessage.getMetaData();
- }
-
- public long getMessageNumber()
- {
- return _storedMessage.getMessageNumber();
- }
-
- public void addContent(int offsetInMessage, ByteBuffer src)
- {
- _storedMessage.addContent(offsetInMessage, src);
- }
-
- public int getContent(int offsetInMessage, ByteBuffer dst)
- {
- return _storedMessage.getContent(offsetInMessage, dst);
- }
-
-
- public ByteBuffer getContent(int offsetInMessage, int size)
- {
- return _storedMessage.getContent(offsetInMessage, size);
- }
-
- public StoreFuture flushToStore()
- {
- return _storedMessage.flushToStore();
- }
-
- public void remove()
- {
- _storedMessage.remove();
- _messageCount.decrementAndGet();
- }
- }
-}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStoreFactory.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStoreFactory.java
deleted file mode 100644
index ba9b7c155e..0000000000
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStoreFactory.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.server.store;
-
-import java.util.Map;
-
-import org.apache.qpid.server.plugin.MessageStoreFactory;
-
-public class TestableMemoryMessageStoreFactory implements MessageStoreFactory
-{
- @Override
- public String getType()
- {
- return TestableMemoryMessageStore.TYPE;
- }
-
- @Override
- public MessageStore createMessageStore()
- {
- return new TestableMemoryMessageStore();
- }
-
- @Override
- public void validateAttributes(Map<String, Object> attributes)
- {
- }
-
-}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java
index ab18c8f41d..da868a01f1 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java
@@ -33,14 +33,14 @@ import org.apache.qpid.server.util.ServerScopedRuntimeException;
* Mock implementation of a (Store) Transaction allow its state to be observed.
* Also provide a factory method to produce TestTransactionLog objects suitable
* for unit test use.
- *
+ *
*/
class MockStoreTransaction implements Transaction
{
enum TransactionState {NOT_STARTED, STARTED, COMMITTED, ABORTED};
private TransactionState _state = TransactionState.NOT_STARTED;
-
+
private int _numberOfEnqueuedMessages = 0;
private int _numberOfDequeuedMessages = 0;
private boolean _throwExceptionOnQueueOp;
@@ -52,7 +52,7 @@ class MockStoreTransaction implements Transaction
public void setState(TransactionState state)
{
- _state = state;
+ _state = state;
}
public TransactionState getState()
@@ -64,10 +64,10 @@ class MockStoreTransaction implements Transaction
{
if (_throwExceptionOnQueueOp)
{
-
+
throw new ServerScopedRuntimeException("Mocked exception");
}
-
+
_numberOfEnqueuedMessages++;
}
@@ -87,7 +87,7 @@ class MockStoreTransaction implements Transaction
{
throw new ServerScopedRuntimeException("Mocked exception");
}
-
+
_numberOfDequeuedMessages++;
}
@@ -124,12 +124,6 @@ class MockStoreTransaction implements Transaction
storeTransaction.setState(TransactionState.STARTED);
return storeTransaction;
}
-
- @Override
- public String getStoreType()
- {
- return "TEST";
- }
};
}
} \ No newline at end of file
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
index fd56f3fa1c..1b131a18e1 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
@@ -44,7 +44,6 @@ import org.apache.qpid.server.security.SubjectCreator;
import org.apache.qpid.server.stats.StatisticsGatherer;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.TestMemoryMessageStore;
-import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.server.virtualhost.StandardVirtualHostFactory;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.plugin.PluggableFactoryLoader;
@@ -110,7 +109,7 @@ public class BrokerTestHelper
when(virtualHost.getAttribute(org.apache.qpid.server.model.VirtualHost.TYPE)).thenReturn(StandardVirtualHostFactory.TYPE);
Map<String, Object> messageStoreSettings = new HashMap<String, Object>();
- messageStoreSettings.put(MessageStore.STORE_TYPE, TestableMemoryMessageStore.TYPE);
+ messageStoreSettings.put(MessageStore.STORE_TYPE, TestMemoryMessageStore.TYPE);
when(virtualHost.getMessageStoreSettings()).thenReturn(messageStoreSettings);
when(virtualHost.getName()).thenReturn(name);
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MessageStoreRecovererTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MessageStoreRecovererTest.java
new file mode 100644
index 0000000000..ce5616b9ca
--- /dev/null
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MessageStoreRecovererTest.java
@@ -0,0 +1,414 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isNull;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.UUID;
+
+import junit.framework.TestCase;
+
+import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
+import org.apache.qpid.server.message.EnqueueableMessage;
+import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.NullMessageStore;
+import org.apache.qpid.server.store.StorableMessageMetaData;
+import org.apache.qpid.server.store.StoreException;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.TestMessageMetaData;
+import org.apache.qpid.server.store.Transaction;
+import org.apache.qpid.server.store.Transaction.Record;
+import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
+import org.apache.qpid.server.store.handler.MessageHandler;
+import org.apache.qpid.server.store.handler.MessageInstanceHandler;
+import org.apache.qpid.server.txn.DtxBranch;
+import org.apache.qpid.server.txn.DtxRegistry;
+import org.apache.qpid.server.util.Action;
+import org.apache.qpid.transport.Xid;
+import org.mockito.ArgumentMatcher;
+
+public class MessageStoreRecovererTest extends TestCase
+{
+ private VirtualHost _virtualHost;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ _virtualHost = mock(VirtualHost.class);
+ when(_virtualHost.getEventLogger()).thenReturn(new EventLogger());
+
+ }
+
+ @SuppressWarnings("unchecked")
+ public void testRecoveryOfSingleMessageOnSingleQueue()
+ {
+ final AMQQueue<?> queue = createRegisteredMockQueue();
+
+ final long messageId = 1;
+ final StoredMessage<StorableMessageMetaData> storedMessage = createMockStoredMessage(messageId);
+
+ MessageStore store = new NullMessageStore()
+ {
+ @Override
+ public void visitMessages(MessageHandler handler) throws StoreException
+ {
+ handler.handle(storedMessage);
+ }
+
+ @Override
+ public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException
+ {
+ handler.handle(queue.getId(), messageId);
+ }
+ };
+
+ when(_virtualHost.getMessageStore()).thenReturn(store);
+
+ MessageStoreRecoverer recoverer = new MessageStoreRecoverer(_virtualHost, mock(MessageStoreLogSubject.class));
+ recoverer.recover();
+
+ ServerMessage<?> message = storedMessage.getMetaData().getType().createMessage(storedMessage);
+ verify(queue, times(1)).enqueue(eq(message), (Action<? super MessageInstance>)isNull());
+ }
+
+ @SuppressWarnings("unchecked")
+ public void testRecoveryOfMessageInstanceForNonExistingMessage()
+ {
+ final AMQQueue<?> queue = createRegisteredMockQueue();
+
+ final long messageId = 1;
+ final Transaction transaction = mock(Transaction.class);
+
+ MessageStore store = new NullMessageStore()
+ {
+ @Override
+ public void visitMessages(MessageHandler handler) throws StoreException
+ {
+ // no message to visit
+ }
+
+ @Override
+ public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException
+ {
+ handler.handle(queue.getId(), messageId);
+ }
+
+ @Override
+ public Transaction newTransaction()
+ {
+ return transaction;
+ }
+ };
+
+ when(_virtualHost.getMessageStore()).thenReturn(store);
+
+ MessageStoreRecoverer recoverer = new MessageStoreRecoverer(_virtualHost, mock(MessageStoreLogSubject.class));
+ recoverer.recover();
+
+ verify(queue, never()).enqueue(any(ServerMessage.class), any(Action.class));
+ verify(transaction).dequeueMessage(same(queue), argThat(new MessageNumberMatcher(messageId)));
+ verify(transaction, times(1)).commitTranAsync();
+ }
+
+ public void testRecoveryOfMessageInstanceForNonExistingQueue()
+ {
+ final UUID queueId = UUID.randomUUID();
+ final Transaction transaction = mock(Transaction.class);
+ final long messageId = 1;
+ final StoredMessage<StorableMessageMetaData> storedMessage = createMockStoredMessage(messageId);
+
+ MessageStore store = new NullMessageStore()
+ {
+ @Override
+ public void visitMessages(MessageHandler handler) throws StoreException
+ {
+ handler.handle(storedMessage);
+ }
+
+ @Override
+ public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException
+ {
+ handler.handle(queueId, messageId);
+ }
+
+ @Override
+ public Transaction newTransaction()
+ {
+ return transaction;
+ }
+ };
+
+ when(_virtualHost.getMessageStore()).thenReturn(store);
+
+ MessageStoreRecoverer recoverer = new MessageStoreRecoverer(_virtualHost, mock(MessageStoreLogSubject.class));
+ recoverer.recover();
+
+ verify(transaction).dequeueMessage(argThat(new QueueIdMatcher(queueId)), argThat(new MessageNumberMatcher(messageId)));
+ verify(transaction, times(1)).commitTranAsync();
+ }
+
+ public void testRecoveryDeletesOrphanMessages()
+ {
+
+ final long messageId = 1;
+ final StoredMessage<StorableMessageMetaData> storedMessage = createMockStoredMessage(messageId);
+
+ MessageStore store = new NullMessageStore()
+ {
+ @Override
+ public void visitMessages(MessageHandler handler) throws StoreException
+ {
+ handler.handle(storedMessage);
+ }
+
+ @Override
+ public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException
+ {
+ // No messages instances
+ }
+ };
+
+ when(_virtualHost.getMessageStore()).thenReturn(store);
+
+ MessageStoreRecoverer recoverer = new MessageStoreRecoverer(_virtualHost, mock(MessageStoreLogSubject.class));
+ recoverer.recover();
+
+ verify(storedMessage, times(1)).remove();
+ }
+
+ @SuppressWarnings("unchecked")
+ public void testRecoveryOfSingleEnqueueWithDistributedTransaction()
+ {
+ AMQQueue<?> queue = createRegisteredMockQueue();
+
+ final Transaction transaction = mock(Transaction.class);
+
+ final StoredMessage<StorableMessageMetaData> storedMessage = createMockStoredMessage(1);
+ long messageId = storedMessage.getMessageNumber();
+
+ EnqueueableMessage enqueueableMessage = createMockEnqueueableMessage(messageId, storedMessage);
+ Record enqueueRecord = createMockRecord(queue, enqueueableMessage);
+
+ final long format = 1;
+ final byte[] globalId = new byte[] {0};
+ final byte[] branchId = new byte[] {0};
+ final Record[] enqueues = { enqueueRecord };
+ final Record[] dequeues = {};
+
+ MessageStore store = new NullMessageStore()
+ {
+ @Override
+ public void visitMessages(MessageHandler handler) throws StoreException
+ {
+ handler.handle(storedMessage);
+ }
+
+ @Override
+ public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException
+ {
+ // No messages instances
+ }
+
+ @Override
+ public void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException
+ {
+ handler.handle(format, globalId, branchId, enqueues, dequeues);
+ }
+
+ @Override
+ public Transaction newTransaction()
+ {
+ return transaction;
+ }
+ };
+
+ DtxRegistry dtxRegistry = new DtxRegistry();
+
+ when(_virtualHost.getMessageStore()).thenReturn(store);
+ when(_virtualHost.getDtxRegistry()).thenReturn(dtxRegistry);
+
+ MessageStoreRecoverer recoverer = new MessageStoreRecoverer(_virtualHost, mock(MessageStoreLogSubject.class));
+ recoverer.recover();
+
+ DtxBranch branch = dtxRegistry.getBranch(new Xid(format, globalId, branchId));
+ assertNotNull("Expected dtx branch to be created", branch);
+ branch.commit();
+
+ ServerMessage<?> message = storedMessage.getMetaData().getType().createMessage(storedMessage);
+ verify(queue, times(1)).enqueue(eq(message), (Action<? super MessageInstance>)isNull());
+ verify(transaction).commitTran();
+ }
+
+ public void testRecoveryOfSingleDequeueWithDistributedTransaction()
+ {
+ final AMQQueue<?> queue = createRegisteredMockQueue();
+
+
+ final Transaction transaction = mock(Transaction.class);
+
+ final StoredMessage<StorableMessageMetaData> storedMessage = createMockStoredMessage(1);
+ final long messageId = storedMessage.getMessageNumber();
+
+ EnqueueableMessage enqueueableMessage = createMockEnqueueableMessage(messageId, storedMessage);
+ Record dequeueRecord = createMockRecord(queue, enqueueableMessage);
+
+ QueueEntry queueEntry = mock(QueueEntry.class);
+ when(queue.getMessageOnTheQueue(messageId)).thenReturn(queueEntry);
+
+ final long format = 1;
+ final byte[] globalId = new byte[] {0};
+ final byte[] branchId = new byte[] {0};
+ final Record[] enqueues = {};
+ final Record[] dequeues = { dequeueRecord };
+
+ MessageStore store = new NullMessageStore()
+ {
+ @Override
+ public void visitMessages(MessageHandler handler) throws StoreException
+ {
+ handler.handle(storedMessage);
+ }
+
+ @Override
+ public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException
+ {
+ // We need the message to be enqueued onto the queue so that later the distributed transaction
+ // can dequeue it.
+ handler.handle(queue.getId(), messageId);
+ }
+
+ @Override
+ public void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException
+ {
+ handler.handle(format, globalId, branchId, enqueues, dequeues);
+ }
+
+ @Override
+ public Transaction newTransaction()
+ {
+ return transaction;
+ }
+ };
+
+ DtxRegistry dtxRegistry = new DtxRegistry();
+
+ when(_virtualHost.getMessageStore()).thenReturn(store);
+ when(_virtualHost.getDtxRegistry()).thenReturn(dtxRegistry);
+
+ MessageStoreRecoverer recoverer = new MessageStoreRecoverer(_virtualHost, mock(MessageStoreLogSubject.class));
+ recoverer.recover();
+
+ DtxBranch branch = dtxRegistry.getBranch(new Xid(format, globalId, branchId));
+ assertNotNull("Expected dtx branch to be created", branch);
+ branch.commit();
+
+ verify(queueEntry, times(1)).delete();
+ verify(transaction).commitTran();
+ }
+
+
+ protected Record createMockRecord(AMQQueue<?> queue, EnqueueableMessage enqueueableMessage)
+ {
+ Record enqueueRecord = mock(Record.class);
+ when(enqueueRecord.getMessage()).thenReturn(enqueueableMessage);
+ when(enqueueRecord.getResource()).thenReturn(queue);
+ return enqueueRecord;
+ }
+
+ protected EnqueueableMessage createMockEnqueueableMessage(long messageId,
+ final StoredMessage<StorableMessageMetaData> storedMessage)
+ {
+ EnqueueableMessage enqueueableMessage = mock(EnqueueableMessage.class);
+ when(enqueueableMessage.getMessageNumber()).thenReturn(messageId);
+ when(enqueueableMessage.getStoredMessage()).thenReturn(storedMessage);
+ return enqueueableMessage;
+ }
+
+ private StoredMessage<StorableMessageMetaData> createMockStoredMessage(final long messageId)
+ {
+ TestMessageMetaData metaData = new TestMessageMetaData(messageId, 0);
+
+ @SuppressWarnings("unchecked")
+ final StoredMessage<StorableMessageMetaData> storedMessage = mock(StoredMessage.class);
+ when(storedMessage.getMessageNumber()).thenReturn(messageId);
+ when(storedMessage.getMetaData()).thenReturn(metaData);
+ return storedMessage;
+ }
+
+ private AMQQueue<?> createRegisteredMockQueue()
+ {
+ AMQQueue<?> queue = mock(AMQQueue.class);
+ final UUID queueId = UUID.randomUUID();
+ when(queue.getId()).thenReturn(queueId);
+ when(queue.getName()).thenReturn("test-queue");
+ when(_virtualHost.getQueue(queueId)).thenReturn(queue);
+ return queue;
+ }
+
+
+ private final class QueueIdMatcher extends ArgumentMatcher<TransactionLogResource>
+ {
+ private UUID _queueId;
+ public QueueIdMatcher(UUID queueId)
+ {
+ _queueId = queueId;
+ }
+
+ @Override
+ public boolean matches(Object argument)
+ {
+ return argument instanceof TransactionLogResource && _queueId.equals( ((TransactionLogResource)argument).getId() );
+ }
+ }
+
+ private final class MessageNumberMatcher extends ArgumentMatcher<EnqueueableMessage>
+ {
+ private final long _messageId;
+
+ private MessageNumberMatcher(long messageId)
+ {
+ _messageId = messageId;
+ }
+
+ @Override
+ public boolean matches(Object argument)
+ {
+ return argument instanceof EnqueueableMessage && ((EnqueueableMessage)argument).getMessageNumber() == _messageId;
+ }
+ }
+}