diff options
Diffstat (limited to 'qpid/java/broker-core/src/test/java')
15 files changed, 1033 insertions, 401 deletions
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandlerTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandlerTest.java index 0fe9d1ac49..1de857d224 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandlerTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandlerTest.java @@ -50,10 +50,10 @@ import org.apache.qpid.server.model.Protocol; import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.SystemContext; import org.apache.qpid.server.model.VirtualHost; -import org.apache.qpid.server.store.ConfigurationRecoveryHandler; import org.apache.qpid.server.store.ConfiguredObjectRecord; import org.apache.qpid.server.store.ConfiguredObjectRecordImpl; import org.apache.qpid.server.store.DurableConfigurationStore; +import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler; import org.apache.qpid.test.utils.QpidTestCase; public class ManagementModeStoreHandlerTest extends QpidTestCase @@ -89,20 +89,22 @@ public class ManagementModeStoreHandlerTest extends QpidTestCase when(_portEntry.getParents()).thenReturn(Collections.singletonMap(Broker.class.getSimpleName(), _root)); when(_portEntry.getType()).thenReturn(Port.class.getSimpleName()); - final ArgumentCaptor<ConfigurationRecoveryHandler> recovererArgumentCaptor = ArgumentCaptor.forClass(ConfigurationRecoveryHandler.class); + final ArgumentCaptor<ConfiguredObjectRecordHandler> recovererArgumentCaptor = ArgumentCaptor.forClass(ConfiguredObjectRecordHandler.class); doAnswer( new Answer() { @Override public Object answer(final InvocationOnMock invocation) throws Throwable { - ConfigurationRecoveryHandler recoverer = recovererArgumentCaptor.getValue(); - recoverer.configuredObject(_root); - recoverer.configuredObject(_portEntry); + ConfiguredObjectRecordHandler recoverer = recovererArgumentCaptor.getValue(); + if(recoverer.handle(_root)) + { + recoverer.handle(_portEntry); + } return null; } } - ).when(_store).recoverConfigurationStore(recovererArgumentCaptor.capture()); + ).when(_store).visitConfiguredObjectRecords(recovererArgumentCaptor.capture()); _options = new BrokerOptions(); _handler = new ManagementModeStoreHandler(_store, _options); @@ -112,21 +114,21 @@ public class ManagementModeStoreHandlerTest extends QpidTestCase private ConfiguredObjectRecord getRootEntry() { BrokerFinder brokerFinder = new BrokerFinder(); - _handler.recoverConfigurationStore(brokerFinder); + _handler.visitConfiguredObjectRecords(brokerFinder); return brokerFinder.getBrokerRecord(); } private ConfiguredObjectRecord getEntry(UUID id) { RecordFinder recordFinder = new RecordFinder(id); - _handler.recoverConfigurationStore(recordFinder); + _handler.visitConfiguredObjectRecords(recordFinder); return recordFinder.getFoundRecord(); } private Collection<UUID> getChildrenIds(ConfiguredObjectRecord record) { ChildFinder childFinder = new ChildFinder(record); - _handler.recoverConfigurationStore(childFinder); + _handler.visitConfiguredObjectRecords(childFinder); return childFinder.getChildIds(); } @@ -288,21 +290,25 @@ public class ManagementModeStoreHandlerTest extends QpidTestCase attributes.put(VirtualHost.TYPE, "STANDARD"); final ConfiguredObjectRecord virtualHost = new ConfiguredObjectRecordImpl(virtualHostId, VirtualHost.class.getSimpleName(), attributes, Collections.singletonMap(Broker.class.getSimpleName(), _root)); - final ArgumentCaptor<ConfigurationRecoveryHandler> recovererArgumentCaptor = ArgumentCaptor.forClass(ConfigurationRecoveryHandler.class); + final ArgumentCaptor<ConfiguredObjectRecordHandler> recovererArgumentCaptor = ArgumentCaptor.forClass(ConfiguredObjectRecordHandler.class); doAnswer( new Answer() { @Override public Object answer(final InvocationOnMock invocation) throws Throwable { - ConfigurationRecoveryHandler recoverer = recovererArgumentCaptor.getValue(); - recoverer.configuredObject(_root); - recoverer.configuredObject(_portEntry); - recoverer.configuredObject(virtualHost); + ConfiguredObjectRecordHandler recoverer = recovererArgumentCaptor.getValue(); + if(recoverer.handle(_root)) + { + if(recoverer.handle(_portEntry)) + { + recoverer.handle(virtualHost); + } + } return null; } } - ).when(_store).recoverConfigurationStore(recovererArgumentCaptor.capture()); + ).when(_store).visitConfiguredObjectRecords(recovererArgumentCaptor.capture()); State expectedState = mmQuiesceVhosts ? State.QUIESCED : null; if(mmQuiesceVhosts) @@ -457,28 +463,32 @@ public class ManagementModeStoreHandlerTest extends QpidTestCase } - private class BrokerFinder implements ConfigurationRecoveryHandler + private class BrokerFinder implements ConfiguredObjectRecordHandler { private ConfiguredObjectRecord _brokerRecord; + private int _version; + @Override - public void beginConfigurationRecovery(final DurableConfigurationStore store, final int configVersion) + public void begin(final int configVersion) { - + _version = configVersion; } @Override - public void configuredObject(final ConfiguredObjectRecord object) + public boolean handle(final ConfiguredObjectRecord object) { if(object.getType().equals(Broker.class.getSimpleName())) { _brokerRecord = object; + return false; } + return true; } @Override - public int completeConfigurationRecovery() + public int end() { - return 0; + return _version; } public ConfiguredObjectRecord getBrokerRecord() @@ -487,10 +497,11 @@ public class ManagementModeStoreHandlerTest extends QpidTestCase } } - private class RecordFinder implements ConfigurationRecoveryHandler + private class RecordFinder implements ConfiguredObjectRecordHandler { private final UUID _id; private ConfiguredObjectRecord _foundRecord; + private int _version; private RecordFinder(final UUID id) { @@ -498,24 +509,26 @@ public class ManagementModeStoreHandlerTest extends QpidTestCase } @Override - public void beginConfigurationRecovery(final DurableConfigurationStore store, final int configVersion) + public void begin(final int configVersion) { - + _version = configVersion; } @Override - public void configuredObject(final ConfiguredObjectRecord object) + public boolean handle(final ConfiguredObjectRecord object) { if(object.getId().equals(_id)) { _foundRecord = object; + return false; } + return true; } @Override - public int completeConfigurationRecovery() + public int end() { - return 0; + return _version; } public ConfiguredObjectRecord getFoundRecord() @@ -524,10 +537,11 @@ public class ManagementModeStoreHandlerTest extends QpidTestCase } } - private class ChildFinder implements ConfigurationRecoveryHandler + private class ChildFinder implements ConfiguredObjectRecordHandler { private final Collection<UUID> _childIds = new HashSet<UUID>(); private final ConfiguredObjectRecord _parent; + private int _version; private ChildFinder(final ConfiguredObjectRecord parent) { @@ -535,13 +549,13 @@ public class ManagementModeStoreHandlerTest extends QpidTestCase } @Override - public void beginConfigurationRecovery(final DurableConfigurationStore store, final int configVersion) + public void begin(final int configVersion) { - + _version = configVersion; } @Override - public void configuredObject(final ConfiguredObjectRecord object) + public boolean handle(final ConfiguredObjectRecord object) { if(object.getParents() != null) @@ -555,12 +569,13 @@ public class ManagementModeStoreHandlerTest extends QpidTestCase } } + return true; } @Override - public int completeConfigurationRecovery() + public int end() { - return 0; + return _version; } public Collection<UUID> getChildIds() diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java index 83052110a1..b38d9d7bd2 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java @@ -21,9 +21,7 @@ package org.apache.qpid.server.store; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.argThat; -import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; @@ -49,6 +47,7 @@ import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.security.SecurityManager; +import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler; import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.util.FileUtils; import org.mockito.ArgumentCaptor; @@ -71,9 +70,8 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest private String _storePath; private String _storeName; - private ConfigurationRecoveryHandler _recoveryHandler; + private ConfiguredObjectRecordHandler _handler; - private ExchangeImpl _exchange = mock(ExchangeImpl.class); private static final String ROUTING_KEY = "routingKey"; private static final String QUEUE_NAME = "queueName"; private Map<String,Object> _bindingArgs; @@ -96,16 +94,8 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest FileUtils.delete(new File(_storePath), true); setTestSystemProperty("QPID_WORK", TMP_FOLDER); - _recoveryHandler = mock(ConfigurationRecoveryHandler.class); - when(_exchange.getName()).thenReturn(EXCHANGE_NAME); - when(_exchange.getId()).thenReturn(_exchangeId); - when(_exchange.getExchangeType()).thenReturn(mock(ExchangeType.class)); - when(_exchange.getEventLogger()).thenReturn(new EventLogger()); - - ConfiguredObjectRecord exchangeRecord = mock(ConfiguredObjectRecord.class); - when(exchangeRecord.getId()).thenReturn(_exchangeId); - when(exchangeRecord.getType()).thenReturn(Exchange.class.getSimpleName()); - when(_exchange.asObjectRecord()).thenReturn(exchangeRecord); + _handler = mock(ConfiguredObjectRecordHandler.class); + when(_handler.handle(any(ConfiguredObjectRecord.class))).thenReturn(true); _bindingArgs = new HashMap<String, Object>(); String argKey = AMQPFilterTypes.JMS_SELECTOR.toString(); @@ -134,7 +124,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest DurableConfigurationStoreHelper.createExchange(_configStore, exchange); reopenStore(); - verify(_recoveryHandler).configuredObject(matchesRecord(_exchangeId, EXCHANGE, + verify(_handler).handle(matchesRecord(_exchangeId, EXCHANGE, map( org.apache.qpid.server.model.Exchange.NAME, getName(), org.apache.qpid.server.model.Exchange.TYPE, getName()+"Type", org.apache.qpid.server.model.Exchange.LIFETIME_POLICY, LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS.name()))); @@ -168,14 +158,16 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest DurableConfigurationStoreHelper.removeExchange(_configStore, exchange); reopenStore(); - verify(_recoveryHandler, never()).configuredObject(any(ConfiguredObjectRecord.class)); + verify(_handler, never()).handle(any(ConfiguredObjectRecord.class)); } public void testBindQueue() throws Exception { + ExchangeImpl<?> exchange = createTestExchange(); AMQQueue queue = createTestQueue(QUEUE_NAME, "queueOwner", false, null); BindingImpl binding = new BindingImpl(UUIDGenerator.generateRandomUUID(), ROUTING_KEY, queue, - _exchange, _bindingArgs); + exchange, _bindingArgs); + DurableConfigurationStoreHelper.createExchange(_configStore, exchange); DurableConfigurationStoreHelper.createQueue(_configStore, queue); DurableConfigurationStoreHelper.createBinding(_configStore, binding); @@ -187,10 +179,10 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest Map<String,UUID> parents = new HashMap<String, UUID>(); - parents.put(Exchange.class.getSimpleName(), _exchange.getId()); + parents.put(Exchange.class.getSimpleName(), exchange.getId()); parents.put(Queue.class.getSimpleName(), queue.getId()); - verify(_recoveryHandler).configuredObject(matchesRecord(binding.getId(), BINDING, map, parents)); + verify(_handler).handle(matchesRecord(binding.getId(), BINDING, map, parents)); } @@ -260,15 +252,18 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest public void testUnbindQueue() throws Exception { + ExchangeImpl<?> exchange = createTestExchange(); + DurableConfigurationStoreHelper.createExchange(_configStore, exchange); + AMQQueue queue = createTestQueue(QUEUE_NAME, "queueOwner", false, null); BindingImpl binding = new BindingImpl(UUIDGenerator.generateRandomUUID(), ROUTING_KEY, queue, - _exchange, _bindingArgs); + exchange, _bindingArgs); DurableConfigurationStoreHelper.createBinding(_configStore, binding); DurableConfigurationStoreHelper.removeBinding(_configStore, binding); reopenStore(); - verify(_recoveryHandler, never()).configuredObject(matchesRecord(ANY_UUID, BINDING, + verify(_handler, never()).handle(matchesRecord(ANY_UUID, BINDING, ANY_MAP)); } @@ -282,7 +277,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest queueAttributes.put(Queue.NAME, getName()); queueAttributes.put(Queue.OWNER, getName()+"Owner"); queueAttributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONTAINER.name()); - verify(_recoveryHandler).configuredObject(matchesRecord(_queueId, QUEUE, queueAttributes)); + verify(_handler).handle(matchesRecord(_queueId, QUEUE, queueAttributes)); } public void testCreateQueueAMQQueueFieldTable() throws Exception @@ -304,7 +299,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest queueAttributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONTAINER.name()); queueAttributes.putAll(attributes); - verify(_recoveryHandler).configuredObject(matchesRecord(_queueId, QUEUE, queueAttributes)); + verify(_handler).handle(matchesRecord(_queueId, QUEUE, queueAttributes)); } public void testCreateQueueAMQQueueWithAlternateExchange() throws Exception @@ -322,7 +317,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest queueAttributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONTAINER.name()); queueAttributes.put(Queue.ALTERNATE_EXCHANGE, alternateExchange.getId().toString()); - verify(_recoveryHandler).configuredObject(matchesRecord(_queueId, QUEUE, queueAttributes)); + verify(_handler).handle(matchesRecord(_queueId, QUEUE, queueAttributes)); } private ExchangeImpl createTestAlternateExchange() @@ -355,7 +350,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest queueAttributes.put(Queue.NAME, getName()); queueAttributes.putAll(attributes); - verify(_recoveryHandler).configuredObject(matchesRecord(_queueId, QUEUE, queueAttributes)); + verify(_handler).handle(matchesRecord(_queueId, QUEUE, queueAttributes)); } @@ -382,7 +377,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest queueAttributes.putAll(attributes); queueAttributes.put(Queue.ALTERNATE_EXCHANGE, alternateExchange.getId().toString()); - verify(_recoveryHandler).configuredObject(matchesRecord(_queueId, QUEUE, queueAttributes)); + verify(_handler).handle(matchesRecord(_queueId, QUEUE, queueAttributes)); } public void testRemoveQueue() throws Exception @@ -397,7 +392,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest // remove queue DurableConfigurationStoreHelper.removeQueue(_configStore,queue); reopenStore(); - verify(_recoveryHandler, never()).configuredObject(any(ConfiguredObjectRecord.class)); + verify(_handler, never()).handle(any(ConfiguredObjectRecord.class)); } private AMQQueue createTestQueue(String queueName, @@ -463,11 +458,9 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest { ExchangeImpl exchange = mock(ExchangeImpl.class); Map<String,Object> actualAttributes = new HashMap<String, Object>(); - actualAttributes.put("id", _exchangeId); actualAttributes.put("name", getName()); actualAttributes.put("type", getName() + "Type"); actualAttributes.put("lifetimePolicy", LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS); - when(exchange.getActualAttributes()).thenReturn(actualAttributes); when(exchange.getName()).thenReturn(getName()); when(exchange.getTypeName()).thenReturn(getName() + "Type"); when(exchange.isAutoDelete()).thenReturn(true); @@ -475,11 +468,10 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest ConfiguredObjectRecord exchangeRecord = mock(ConfiguredObjectRecord.class); when(exchangeRecord.getId()).thenReturn(_exchangeId); when(exchangeRecord.getType()).thenReturn(Exchange.class.getSimpleName()); - Map<String,Object> actualAttributesExceptId = new HashMap<String, Object>(actualAttributes); - actualAttributesExceptId.remove("id"); - when(exchangeRecord.getAttributes()).thenReturn(actualAttributesExceptId); + when(exchangeRecord.getAttributes()).thenReturn(actualAttributes); when(exchange.asObjectRecord()).thenReturn(exchangeRecord); - + when(exchange.getExchangeType()).thenReturn(mock(ExchangeType.class)); + when(exchange.getEventLogger()).thenReturn(new EventLogger()); return exchange; } @@ -491,7 +483,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest ConfiguredObject<?> parent = mock(ConfiguredObject.class); when(parent.getName()).thenReturn("testName"); _configStore.openConfigurationStore(parent, _configurationStoreSettings); - _configStore.recoverConfigurationStore(_recoveryHandler); + _configStore.visitConfiguredObjectRecords(_handler); } protected abstract DurableConfigurationStore createConfigStore() throws Exception; diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreConfigurationTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreConfigurationTest.java index 8f2d0029f6..2400a68c93 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreConfigurationTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreConfigurationTest.java @@ -27,10 +27,4 @@ public class JsonFileConfigStoreConfigurationTest extends AbstractDurableConfigu { return new JsonFileConfigStore(); } - - @Override - public void testBindQueue() throws Exception - { - // TODO: Temporarily disable the test as it is already fixed on trunk - } } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java index 1de24e371d..6907898a6c 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java @@ -28,6 +28,7 @@ import java.util.UUID; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler; import org.apache.qpid.server.util.ServerScopedRuntimeException; import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.test.utils.TestFileUtils; @@ -43,15 +44,15 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.times; public class JsonFileConfigStoreTest extends QpidTestCase { - private final ConfigurationRecoveryHandler _recoveryHandler = mock(ConfigurationRecoveryHandler.class); - private JsonFileConfigStore _store; private HashMap<String, Object> _configurationStoreSettings; private ConfiguredObject<?> _virtualHost; private File _storeLocation; + private ConfiguredObjectRecordHandler _handler; private static final UUID ANY_UUID = UUID.randomUUID(); @@ -69,6 +70,9 @@ public class JsonFileConfigStoreTest extends QpidTestCase _configurationStoreSettings.put(JsonFileConfigStore.STORE_TYPE, JsonFileConfigStore.TYPE); _configurationStoreSettings.put(JsonFileConfigStore.STORE_PATH, _storeLocation.getAbsolutePath()); _store = new JsonFileConfigStore(); + + _handler = mock(ConfiguredObjectRecordHandler.class); + when(_handler.handle(any(ConfiguredObjectRecord.class))).thenReturn(true); } @Override @@ -113,35 +117,35 @@ public class JsonFileConfigStoreTest extends QpidTestCase } } - public void testStartFromNoStore() throws Exception + public void testVisitEmptyStore() { _store.openConfigurationStore(_virtualHost, _configurationStoreSettings); - _store.recoverConfigurationStore(_recoveryHandler); - InOrder inorder = inOrder(_recoveryHandler); - inorder.verify(_recoveryHandler).beginConfigurationRecovery(eq(_store), eq(0)); - inorder.verify(_recoveryHandler,never()).configuredObject(any(ConfiguredObjectRecord.class)); - inorder.verify(_recoveryHandler).completeConfigurationRecovery(); + _store.visitConfiguredObjectRecords(_handler); + InOrder inorder = inOrder(_handler); + inorder.verify(_handler).begin(eq(0)); + inorder.verify(_handler,never()).handle(any(ConfiguredObjectRecord.class)); + inorder.verify(_handler).end(); _store.closeConfigurationStore(); } public void testUpdatedConfigVersionIsRetained() throws Exception { final int NEW_CONFIG_VERSION = 42; - when(_recoveryHandler.completeConfigurationRecovery()).thenReturn(NEW_CONFIG_VERSION); + when(_handler.end()).thenReturn(NEW_CONFIG_VERSION); _store.openConfigurationStore(_virtualHost, _configurationStoreSettings); - _store.recoverConfigurationStore(_recoveryHandler); + _store.visitConfiguredObjectRecords(_handler); _store.closeConfigurationStore(); _store.openConfigurationStore(_virtualHost, _configurationStoreSettings); - _store.recoverConfigurationStore(_recoveryHandler); - InOrder inorder = inOrder(_recoveryHandler); + _store.visitConfiguredObjectRecords(_handler); + InOrder inorder = inOrder(_handler); // first time the config version should be the initial version - 0 - inorder.verify(_recoveryHandler).beginConfigurationRecovery(eq(_store), eq(0)); + inorder.verify(_handler).begin(eq(0)); // second time the config version should be the updated version - inorder.verify(_recoveryHandler).beginConfigurationRecovery(eq(_store), eq(NEW_CONFIG_VERSION)); + inorder.verify(_handler).begin(eq(NEW_CONFIG_VERSION)); _store.closeConfigurationStore(); } @@ -157,8 +161,9 @@ public class JsonFileConfigStoreTest extends QpidTestCase _store.closeConfigurationStore(); _store.openConfigurationStore(_virtualHost, _configurationStoreSettings); - _store.recoverConfigurationStore(_recoveryHandler); - verify(_recoveryHandler).configuredObject(matchesRecord(queueId, queueType, queueAttr)); + + _store.visitConfiguredObjectRecords(_handler); + verify(_handler, times(1)).handle(matchesRecord(queueId, queueType, queueAttr)); _store.closeConfigurationStore(); } @@ -179,8 +184,8 @@ public class JsonFileConfigStoreTest extends QpidTestCase _store.closeConfigurationStore(); _store.openConfigurationStore(_virtualHost, _configurationStoreSettings); - _store.recoverConfigurationStore(_recoveryHandler); - verify(_recoveryHandler).configuredObject(matchesRecord(queueId, queueType, queueAttr)); + _store.visitConfiguredObjectRecords(_handler); + verify(_handler, times(1)).handle(matchesRecord(queueId, queueType, queueAttr)); _store.closeConfigurationStore(); } @@ -201,8 +206,8 @@ public class JsonFileConfigStoreTest extends QpidTestCase _store.closeConfigurationStore(); _store.openConfigurationStore(_virtualHost, _configurationStoreSettings); - _store.recoverConfigurationStore(_recoveryHandler); - verify(_recoveryHandler, never()).configuredObject(any(ConfiguredObjectRecord.class)); + _store.visitConfiguredObjectRecords(_handler); + verify(_handler, never()).handle(any(ConfiguredObjectRecord.class)); _store.closeConfigurationStore(); } @@ -311,12 +316,12 @@ public class JsonFileConfigStoreTest extends QpidTestCase _store.update(true, bindingRecord, binding2Record); _store.closeConfigurationStore(); _store.openConfigurationStore(_virtualHost, _configurationStoreSettings); - _store.recoverConfigurationStore(_recoveryHandler); - verify(_recoveryHandler).configuredObject(matchesRecord(queueId, "Queue", EMPTY_ATTR)); - verify(_recoveryHandler).configuredObject(matchesRecord(queue2Id, "Queue", EMPTY_ATTR)); - verify(_recoveryHandler).configuredObject(matchesRecord(exchangeId, "Exchange", EMPTY_ATTR)); - verify(_recoveryHandler).configuredObject(matchesRecord(bindingId, "Binding", EMPTY_ATTR)); - verify(_recoveryHandler).configuredObject(matchesRecord(binding2Id, "Binding", EMPTY_ATTR)); + _store.visitConfiguredObjectRecords(_handler); + verify(_handler).handle(matchesRecord(queueId, "Queue", EMPTY_ATTR)); + verify(_handler).handle(matchesRecord(queue2Id, "Queue", EMPTY_ATTR)); + verify(_handler).handle(matchesRecord(exchangeId, "Exchange", EMPTY_ATTR)); + verify(_handler).handle(matchesRecord(bindingId, "Binding", EMPTY_ATTR)); + verify(_handler).handle(matchesRecord(binding2Id, "Binding", EMPTY_ATTR)); _store.closeConfigurationStore(); } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java index 451a2744c3..89fef15e7e 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java @@ -33,7 +33,6 @@ import java.util.UUID; import org.apache.log4j.Logger; import org.apache.qpid.server.message.EnqueueableMessage; import org.apache.qpid.server.model.ConfiguredObject; -import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler; import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.util.FileUtils; @@ -66,12 +65,9 @@ public abstract class MessageStoreQuotaEventsTestBase extends QpidTestCase imple _store = createStore(); - MessageStoreRecoveryHandler recoveryHandler = mock(MessageStoreRecoveryHandler.class); - when(recoveryHandler.begin()).thenReturn(mock(StoredMessageRecoveryHandler.class)); ConfiguredObject<?> parent = mock(ConfiguredObject.class); when(parent.getName()).thenReturn("test"); _store.openMessageStore(parent, storeSettings); - _store.recoverMessageStore(recoveryHandler, null); _transactionResource = UUID.randomUUID(); _events = new ArrayList<Event>(); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java index 51d3fc15d2..8bf981bd7b 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java @@ -21,29 +21,31 @@ package org.apache.qpid.server.store; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.argThat; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; -import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import java.util.UUID; +import java.util.concurrent.atomic.AtomicReference; import org.apache.qpid.server.message.EnqueueableMessage; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.UUIDGenerator; -import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler; import org.apache.qpid.server.store.Transaction.Record; +import org.apache.qpid.server.store.handler.DistributedTransactionHandler; +import org.apache.qpid.server.store.handler.MessageHandler; +import org.apache.qpid.server.store.handler.MessageInstanceHandler; import org.apache.qpid.test.utils.QpidTestCase; +import org.mockito.ArgumentMatcher; public abstract class MessageStoreTestCase extends QpidTestCase { - private MessageStoreRecoveryHandler _messageStoreRecoveryHandler; - private StoredMessageRecoveryHandler _storedMessageRecoveryHandler; - private TransactionLogRecoveryHandler _logRecoveryHandler; - private TransactionLogRecoveryHandler.QueueEntryRecoveryHandler _queueEntryRecoveryHandler; - private TransactionLogRecoveryHandler.DtxRecordRecoveryHandler _dtxRecordRecoveryHandler; - private MessageStore _store; private Map<String, Object> _storeSettings; private ConfiguredObject<?> _parent; @@ -55,35 +57,34 @@ public abstract class MessageStoreTestCase extends QpidTestCase _parent = mock(ConfiguredObject.class); when(_parent.getName()).thenReturn("test"); - _storedMessageRecoveryHandler = mock(StoredMessageRecoveryHandler.class); - _logRecoveryHandler = mock(TransactionLogRecoveryHandler.class); - _messageStoreRecoveryHandler = mock(MessageStoreRecoveryHandler.class); - _queueEntryRecoveryHandler = mock(TransactionLogRecoveryHandler.QueueEntryRecoveryHandler.class); - _dtxRecordRecoveryHandler = mock(TransactionLogRecoveryHandler.DtxRecordRecoveryHandler.class); - - when(_messageStoreRecoveryHandler.begin()).thenReturn(_storedMessageRecoveryHandler); - when(_logRecoveryHandler.begin(any(MessageStore.class))).thenReturn(_queueEntryRecoveryHandler); - when(_queueEntryRecoveryHandler.completeQueueEntryRecovery()).thenReturn(_dtxRecordRecoveryHandler); - _storeSettings = getStoreSettings(); _store = createMessageStore(); _store.openMessageStore(_parent, _storeSettings); - _store.recoverMessageStore(_messageStoreRecoveryHandler, _logRecoveryHandler); + } protected abstract Map<String, Object> getStoreSettings() throws Exception; protected abstract MessageStore createMessageStore(); - public MessageStore getStore() + protected MessageStore getStore() { return _store; } - public void testRecordXid() throws Exception + protected void reopenStore() throws Exception { + _store.closeMessageStore(); + + _store = createMessageStore(); + _store.openMessageStore(_parent, _storeSettings); + } + + public void testAddAndRemoveRecordXid() throws Exception + { + long format = 1l; Record enqueueRecord = getTestRecord(1); Record dequeueRecord = getTestRecord(2); Record[] enqueues = { enqueueRecord }; @@ -92,27 +93,287 @@ public abstract class MessageStoreTestCase extends QpidTestCase byte[] branchId = new byte[] { 2 }; Transaction transaction = _store.newTransaction(); - transaction.recordXid(1l, globalId, branchId, enqueues, dequeues); + transaction.recordXid(format, globalId, branchId, enqueues, dequeues); transaction.commitTran(); + reopenStore(); - verify(_dtxRecordRecoveryHandler).dtxRecord(1l, globalId, branchId, enqueues, dequeues); + + DistributedTransactionHandler handler = mock(DistributedTransactionHandler.class); + _store.visitDistributedTransactions(handler); + verify(handler, times(1)).handle(format,globalId, branchId, enqueues, dequeues); transaction = _store.newTransaction(); transaction.removeXid(1l, globalId, branchId); transaction.commitTran(); reopenStore(); - verify(_dtxRecordRecoveryHandler, times(1)).dtxRecord(1l, globalId, branchId, enqueues, dequeues); + + handler = mock(DistributedTransactionHandler.class); + _store.visitDistributedTransactions(handler); + verify(handler, never()).handle(format,globalId, branchId, enqueues, dequeues); } - private void reopenStore() throws Exception + public void testVisitMessages() throws Exception { - _store.closeMessageStore(); + long messageId = 1; + int contentSize = 0; + final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(messageId, contentSize)); + StoreFuture flushFuture = message.flushToStore(); + flushFuture.waitForCompletion(); - _store = createMessageStore(); - _store.openMessageStore(_parent, _storeSettings); - _store.recoverMessageStore(_messageStoreRecoveryHandler, _logRecoveryHandler); + MessageHandler handler = mock(MessageHandler.class); + _store.visitMessages(handler); + + verify(handler, times(1)).handle(argThat(new MessageMetaDataMatcher(messageId))); + + } + + public void testVisitMessagesAborted() throws Exception + { + int contentSize = 0; + for (int i = 0; i < 3; i++) + { + final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(i + 1, contentSize)); + StoreFuture flushFuture = message.flushToStore(); + flushFuture.waitForCompletion(); + } + + MessageHandler handler = mock(MessageHandler.class); + when(handler.handle(any(StoredMessage.class))).thenReturn(true, false); + + _store.visitMessages(handler); + + verify(handler, times(2)).handle(any(StoredMessage.class)); + } + + public void testReopenedMessageStoreUsesLastMessageId() throws Exception + { + int contentSize = 0; + for (int i = 0; i < 3; i++) + { + final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(i + 1, contentSize)); + StoreFuture flushFuture = message.flushToStore(); + flushFuture.waitForCompletion(); + } + + reopenStore(); + + final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(4, contentSize)); + + StoreFuture flushFuture = message.flushToStore(); + flushFuture.waitForCompletion(); + + assertEquals("Unexpected message id", 4, message.getMessageNumber()); + } + + public void testVisitMessageInstances() throws Exception + { + long messageId = 1; + int contentSize = 0; + final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(messageId, contentSize)); + StoreFuture flushFuture = message.flushToStore(); + flushFuture.waitForCompletion(); + + EnqueueableMessage enqueueableMessage = createMockEnqueueableMessage(messageId, message); + + UUID queueId = UUID.randomUUID(); + TransactionLogResource queue = createTransactionLogResource(queueId); + + Transaction transaction = _store.newTransaction(); + transaction.enqueueMessage(queue, enqueueableMessage); + transaction.commitTran(); + + MessageInstanceHandler handler = mock(MessageInstanceHandler.class); + _store.visitMessageInstances(handler); + + verify(handler, times(1)).handle(queueId, messageId); + } + + public void testVisitDistributedTransactions() throws Exception + { + long format = 1l; + byte[] branchId = new byte[] { 2 }; + byte[] globalId = new byte[] { 1 }; + Record enqueueRecord = getTestRecord(1); + Record dequeueRecord = getTestRecord(2); + Record[] enqueues = { enqueueRecord }; + Record[] dequeues = { dequeueRecord }; + + Transaction transaction = _store.newTransaction(); + transaction.recordXid(format, globalId, branchId, enqueues, dequeues); + transaction.commitTran(); + + DistributedTransactionHandler handler = mock(DistributedTransactionHandler.class); + _store.visitDistributedTransactions(handler); + + verify(handler, times(1)).handle(format,globalId, branchId, enqueues, dequeues); + + } + + public void testCommitTransaction() throws Exception + { + final UUID mockQueueId = UUIDGenerator.generateRandomUUID(); + TransactionLogResource mockQueue = createTransactionLogResource(mockQueueId); + + Transaction txn = getStore().newTransaction(); + + long messageId1 = 1L; + long messageId2 = 5L; + final EnqueueableMessage enqueueableMessage1 = createEnqueueableMessage(messageId1); + final EnqueueableMessage enqueueableMessage2 = createEnqueueableMessage(messageId2); + + txn.enqueueMessage(mockQueue, enqueueableMessage1); + txn.enqueueMessage(mockQueue, enqueueableMessage2); + txn.commitTran(); + + QueueFilteringMessageInstanceHandler filter = new QueueFilteringMessageInstanceHandler(mockQueueId); + getStore().visitMessageInstances(filter); + Set<Long> enqueuedIds = filter.getEnqueuedIds(); + + assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size()); + assertTrue("Message with id " + messageId1 + " is not found", enqueuedIds.contains(messageId1)); + assertTrue("Message with id " + messageId2 + " is not found", enqueuedIds.contains(messageId2)); } + + public void testRollbackTransactionBeforeCommit() throws Exception + { + final UUID mockQueueId = UUIDGenerator.generateRandomUUID(); + TransactionLogResource mockQueue = createTransactionLogResource(mockQueueId); + + long messageId1 = 21L; + long messageId2 = 22L; + long messageId3 = 23L; + final EnqueueableMessage enqueueableMessage1 = createEnqueueableMessage(messageId1); + final EnqueueableMessage enqueueableMessage2 = createEnqueueableMessage(messageId2); + final EnqueueableMessage enqueueableMessage3 = createEnqueueableMessage(messageId3); + + Transaction txn = getStore().newTransaction(); + + txn.enqueueMessage(mockQueue, enqueueableMessage1); + txn.abortTran(); + + txn = getStore().newTransaction(); + txn.enqueueMessage(mockQueue, enqueueableMessage2); + txn.enqueueMessage(mockQueue, enqueueableMessage3); + txn.commitTran(); + + QueueFilteringMessageInstanceHandler filter = new QueueFilteringMessageInstanceHandler(mockQueueId); + getStore().visitMessageInstances(filter); + Set<Long> enqueuedIds = filter.getEnqueuedIds(); + + assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size()); + assertTrue("Message with id " + messageId2 + " is not found", enqueuedIds.contains(messageId2)); + assertTrue("Message with id " + messageId3 + " is not found", enqueuedIds.contains(messageId3)); + } + + public void testRollbackTransactionAfterCommit() throws Exception + { + final UUID mockQueueId = UUIDGenerator.generateRandomUUID(); + TransactionLogResource mockQueue = createTransactionLogResource(mockQueueId); + + long messageId1 = 30L; + long messageId2 = 31L; + long messageId3 = 32L; + + final EnqueueableMessage enqueueableMessage1 = createEnqueueableMessage(messageId1); + final EnqueueableMessage enqueueableMessage2 = createEnqueueableMessage(messageId2); + final EnqueueableMessage enqueueableMessage3 = createEnqueueableMessage(messageId3); + + Transaction txn = getStore().newTransaction(); + + txn.enqueueMessage(mockQueue, enqueueableMessage1); + txn.commitTran(); + + txn = getStore().newTransaction(); + txn.enqueueMessage(mockQueue, enqueueableMessage2); + txn.abortTran(); + + txn = getStore().newTransaction(); + txn.enqueueMessage(mockQueue, enqueueableMessage3); + txn.commitTran(); + + QueueFilteringMessageInstanceHandler filter = new QueueFilteringMessageInstanceHandler(mockQueueId); + getStore().visitMessageInstances(filter); + Set<Long> enqueuedIds = filter.getEnqueuedIds(); + + assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size()); + assertTrue("Message with id " + messageId1 + " is not found", enqueuedIds.contains(messageId1)); + assertTrue("Message with id " + messageId3 + " is not found", enqueuedIds.contains(messageId3)); + } + + public void testStoreIgnoresTransientMessage() throws Exception + { + long messageId = 1; + int contentSize = 0; + final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(messageId, contentSize, false)); + StoreFuture flushFuture = message.flushToStore(); + flushFuture.waitForCompletion(); + + MessageHandler handler = mock(MessageHandler.class); + _store.visitMessages(handler); + + verify(handler, times(0)).handle(argThat(new MessageMetaDataMatcher(messageId))); + } + + public void testAddAndRemoveMessageWithoutContent() throws Exception + { + long messageId = 1; + int contentSize = 0; + final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(messageId, contentSize)); + StoreFuture flushFuture = message.flushToStore(); + flushFuture.waitForCompletion(); + + final AtomicReference<StoredMessage<?>> retrievedMessageRef = new AtomicReference<StoredMessage<?>>(); + _store.visitMessages(new MessageHandler() + { + + @Override + public boolean handle(StoredMessage<?> storedMessage) + { + retrievedMessageRef.set(storedMessage); + return true; + } + }); + + StoredMessage<?> retrievedMessage = retrievedMessageRef.get(); + assertNotNull("Message was not found", retrievedMessageRef); + assertEquals("Unexpected retreived message", message.getMessageNumber(), retrievedMessage.getMessageNumber()); + + retrievedMessage.remove(); + + retrievedMessageRef.set(null); + _store.visitMessages(new MessageHandler() + { + + @Override + public boolean handle(StoredMessage<?> storedMessage) + { + retrievedMessageRef.set(storedMessage); + return true; + } + }); + assertNull(retrievedMessageRef.get()); + } + + + private TransactionLogResource createTransactionLogResource(UUID queueId) + { + TransactionLogResource queue = mock(TransactionLogResource.class); + when(queue.getId()).thenReturn(queueId); + when(queue.getName()).thenReturn("testQueue"); + when(queue.isDurable()).thenReturn(true); + return queue; + } + + private EnqueueableMessage createMockEnqueueableMessage(long messageId, final StoredMessage<TestMessageMetaData> message) + { + EnqueueableMessage enqueueableMessage = mock(EnqueueableMessage.class); + when(enqueueableMessage.isPersistent()).thenReturn(true); + when(enqueueableMessage.getMessageNumber()).thenReturn(messageId); + when(enqueueableMessage.getStoredMessage()).thenReturn(message); + return enqueueableMessage; + } + private Record getTestRecord(long messageNumber) { UUID queueId1 = UUIDGenerator.generateRandomUUID(); @@ -121,77 +382,66 @@ public abstract class MessageStoreTestCase extends QpidTestCase EnqueueableMessage message1 = mock(EnqueueableMessage.class); when(message1.isPersistent()).thenReturn(true); when(message1.getMessageNumber()).thenReturn(messageNumber); - final StoredMessage storedMessage = mock(StoredMessage.class); + final StoredMessage<?> storedMessage = mock(StoredMessage.class); when(storedMessage.getMessageNumber()).thenReturn(messageNumber); when(message1.getStoredMessage()).thenReturn(storedMessage); Record enqueueRecord = new TestRecord(queue1, message1); return enqueueRecord; } - private static class TestRecord implements Record + private EnqueueableMessage createEnqueueableMessage(long messageId1) { - private TransactionLogResource _queue; - private EnqueueableMessage _message; + final StoredMessage<TestMessageMetaData> message1 = _store.addMessage(new TestMessageMetaData(messageId1, 0)); + StoreFuture flushFuture = message1.flushToStore(); + flushFuture.waitForCompletion(); + EnqueueableMessage enqueueableMessage1 = createMockEnqueueableMessage(messageId1, message1); + return enqueueableMessage1; + } - public TestRecord(TransactionLogResource queue, EnqueueableMessage message) + private class MessageMetaDataMatcher extends ArgumentMatcher<StoredMessage<?>> + { + private long _messageNumber; + + public MessageMetaDataMatcher(long messageNumber) { super(); - _queue = queue; - _message = message; + _messageNumber = messageNumber; } - @Override - public TransactionLogResource getResource() + public boolean matches(Object obj) { - return _queue; + return obj instanceof StoredMessage && ((StoredMessage<?>)obj).getMessageNumber() == _messageNumber; } + } - @Override - public EnqueueableMessage getMessage() - { - return _message; - } + private class QueueFilteringMessageInstanceHandler implements MessageInstanceHandler + { + private final UUID _queueId; + private final Set<Long> _enqueuedIds = new HashSet<Long>(); - @Override - public int hashCode() + public QueueFilteringMessageInstanceHandler(UUID queueId) { - final int prime = 31; - int result = 1; - result = prime * result + ((_message == null) ? 0 : new Long(_message.getMessageNumber()).hashCode()); - result = prime * result + ((_queue == null) ? 0 : _queue.getId().hashCode()); - return result; + _queueId = queueId; } @Override - public boolean equals(Object obj) + public boolean handle(UUID queueId, long messageId) { - if (this == obj) - { - return true; - } - if (obj == null) - { - return false; - } - if (!(obj instanceof Record)) + if (queueId.equals(_queueId)) { - return false; + if (_enqueuedIds.contains(messageId)) + { + fail("Queue with id " + _queueId + " contains duplicate message ids"); + } + _enqueuedIds.add(messageId); } - Record other = (Record) obj; - if (_message == null && other.getMessage() != null) - { - return false; - } - if (_queue == null && other.getResource() != null) - { - return false; - } - if (_message.getMessageNumber() != other.getMessage().getMessageNumber()) - { - return false; - } - return _queue.getId().equals(other.getResource().getId()); + return true; } + public Set<Long> getEnqueuedIds() + { + return _enqueuedIds; + } } + } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java index 32df355c07..bfa4e1d52e 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java @@ -20,15 +20,30 @@ */ package org.apache.qpid.server.store; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.qpid.server.store.handler.MessageHandler; + /** A simple message store that stores the messages in a thread-safe structure in memory. */ public class TestMemoryMessageStore extends AbstractMemoryMessageStore { public static final String TYPE = "TestMemory"; - @Override - public String getStoreType() + public int getMessageCount() { - return TYPE; + final AtomicInteger counter = new AtomicInteger(); + visitMessages(new MessageHandler() + { + + @Override + public boolean handle(StoredMessage<?> storedMessage) + { + counter.incrementAndGet(); + return true; + } + }); + return counter.get(); } + } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java index e14b41b221..6e55b468a6 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java @@ -34,13 +34,20 @@ public class TestMessageMetaData implements StorableMessageMetaData private static final TestMessageMetaDataType TYPE = new TestMessageMetaDataType(); - private int _contentSize; - private long _messageId; + private final int _contentSize; + private final long _messageId; + private final boolean _persistent; public TestMessageMetaData(long messageId, int contentSize) { + this(messageId, contentSize, true); + } + + public TestMessageMetaData(long messageId, int contentSize, boolean persistent) + { _contentSize = contentSize; _messageId = messageId; + _persistent = persistent; } @Override @@ -59,7 +66,7 @@ public class TestMessageMetaData implements StorableMessageMetaData } @Override - public MessageMetaDataType getType() + public MessageMetaDataType<TestMessageMetaData> getType() { return TYPE; } @@ -67,7 +74,7 @@ public class TestMessageMetaData implements StorableMessageMetaData @Override public boolean isPersistent() { - return true; + return _persistent; } @Override diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java index 5622383f3f..e5c94cf66b 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java @@ -70,7 +70,22 @@ public class TestMessageMetaDataType implements MessageMetaDataType<TestMessageM private static class TestServerMessage implements ServerMessage<TestMessageMetaData> { - private StoredMessage<TestMessageMetaData> _storedMsg; + private final StoredMessage<TestMessageMetaData> _storedMsg; + + private final MessageReference<ServerMessage> _messageReference = new MessageReference<ServerMessage>() + { + + @Override + public ServerMessage getMessage() + { + return TestServerMessage.this; + } + + @Override + public void release() + { + } + }; public TestServerMessage(StoredMessage<TestMessageMetaData> storedMsg) { @@ -115,7 +130,7 @@ public class TestMessageMetaDataType implements MessageMetaDataType<TestMessageM @Override public long getMessageNumber() { - return 0; + return _storedMsg.getMessageNumber(); } @Override @@ -140,13 +155,54 @@ public class TestMessageMetaDataType implements MessageMetaDataType<TestMessageM @Override public boolean isPersistent() { - return false; + return _storedMsg.getMetaData().isPersistent(); } @Override public MessageReference newReference() { - return null; + return _messageReference; + } + + @Override + public int hashCode() + { + final int prime = 31; + int result = 1; + result = prime * result + ((_storedMsg == null) ? 0 : _storedMsg.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) + { + return true; + } + if (obj == null) + { + return false; + } + if (getClass() != obj.getClass()) + { + return false; + } + TestServerMessage other = (TestServerMessage) obj; + if (_storedMsg == null) + { + if (other._storedMsg != null) + { + return false; + } + } + else if (!_storedMsg.equals(other._storedMsg)) + { + return false; + } + return true; } + + } } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestRecord.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestRecord.java new file mode 100644 index 0000000000..668d9d5242 --- /dev/null +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestRecord.java @@ -0,0 +1,91 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import org.apache.qpid.server.message.EnqueueableMessage; +import org.apache.qpid.server.store.Transaction.Record; + +public class TestRecord implements Record +{ + private TransactionLogResource _queue; + private EnqueueableMessage _message; + + public TestRecord(TransactionLogResource queue, EnqueueableMessage message) + { + super(); + _queue = queue; + _message = message; + } + + @Override + public TransactionLogResource getResource() + { + return _queue; + } + + @Override + public EnqueueableMessage getMessage() + { + return _message; + } + + @Override + public int hashCode() + { + final int prime = 31; + int result = 1; + result = prime * result + ((_message == null) ? 0 : new Long(_message.getMessageNumber()).hashCode()); + result = prime * result + ((_queue == null) ? 0 : _queue.getId().hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) + { + return true; + } + if (obj == null) + { + return false; + } + if (!(obj instanceof Record)) + { + return false; + } + Record other = (Record) obj; + if (_message == null && other.getMessage() != null) + { + return false; + } + if (_queue == null && other.getResource() != null) + { + return false; + } + if (_message.getMessageNumber() != other.getMessage().getMessageNumber()) + { + return false; + } + return _queue.getId().equals(other.getResource().getId()); + } + +}
\ No newline at end of file diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java deleted file mode 100644 index 7d4dcd0280..0000000000 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java +++ /dev/null @@ -1,149 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store; - -import org.apache.qpid.server.message.EnqueueableMessage; -import org.apache.qpid.server.queue.AMQQueue; - -import java.nio.ByteBuffer; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * Adds some extra methods to the memory message store for testing purposes. - */ -public class TestableMemoryMessageStore extends TestMemoryMessageStore -{ - public static final String TYPE = "TestableMemory"; - private final Map<Long, AMQQueue> _messages = new HashMap<Long, AMQQueue>(); - private final AtomicInteger _messageCount = new AtomicInteger(0); - - @Override - public StoredMessage addMessage(StorableMessageMetaData metaData) - { - return new TestableStoredMessage(super.addMessage(metaData)); - } - - public int getMessageCount() - { - return _messageCount.get(); - } - - public Map<Long, AMQQueue> getMessages() - { - return _messages; - } - - private class TestableTransaction implements Transaction - { - @Override - public void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message) - { - getMessages().put(message.getMessageNumber(), (AMQQueue)queue); - } - - @Override - public void dequeueMessage(TransactionLogResource queue, EnqueueableMessage message) - { - getMessages().remove(message.getMessageNumber()); - } - - @Override - public void commitTran() - { - } - - @Override - public StoreFuture commitTranAsync() - { - return StoreFuture.IMMEDIATE_FUTURE; - } - - public void abortTran() - { - } - - public void removeXid(long format, byte[] globalId, byte[] branchId) - { - } - - public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues) - { - } - } - - - @Override - public Transaction newTransaction() - { - return new TestableTransaction(); - } - - - private class TestableStoredMessage implements StoredMessage - { - private final StoredMessage _storedMessage; - - public TestableStoredMessage(StoredMessage storedMessage) - { - _messageCount.incrementAndGet(); - _storedMessage = storedMessage; - } - - public StorableMessageMetaData getMetaData() - { - return _storedMessage.getMetaData(); - } - - public long getMessageNumber() - { - return _storedMessage.getMessageNumber(); - } - - public void addContent(int offsetInMessage, ByteBuffer src) - { - _storedMessage.addContent(offsetInMessage, src); - } - - public int getContent(int offsetInMessage, ByteBuffer dst) - { - return _storedMessage.getContent(offsetInMessage, dst); - } - - - public ByteBuffer getContent(int offsetInMessage, int size) - { - return _storedMessage.getContent(offsetInMessage, size); - } - - public StoreFuture flushToStore() - { - return _storedMessage.flushToStore(); - } - - public void remove() - { - _storedMessage.remove(); - _messageCount.decrementAndGet(); - } - } -} diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStoreFactory.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStoreFactory.java deleted file mode 100644 index ba9b7c155e..0000000000 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStoreFactory.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.server.store; - -import java.util.Map; - -import org.apache.qpid.server.plugin.MessageStoreFactory; - -public class TestableMemoryMessageStoreFactory implements MessageStoreFactory -{ - @Override - public String getType() - { - return TestableMemoryMessageStore.TYPE; - } - - @Override - public MessageStore createMessageStore() - { - return new TestableMemoryMessageStore(); - } - - @Override - public void validateAttributes(Map<String, Object> attributes) - { - } - -} diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java index ab18c8f41d..da868a01f1 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java @@ -33,14 +33,14 @@ import org.apache.qpid.server.util.ServerScopedRuntimeException; * Mock implementation of a (Store) Transaction allow its state to be observed. * Also provide a factory method to produce TestTransactionLog objects suitable * for unit test use. - * + * */ class MockStoreTransaction implements Transaction { enum TransactionState {NOT_STARTED, STARTED, COMMITTED, ABORTED}; private TransactionState _state = TransactionState.NOT_STARTED; - + private int _numberOfEnqueuedMessages = 0; private int _numberOfDequeuedMessages = 0; private boolean _throwExceptionOnQueueOp; @@ -52,7 +52,7 @@ class MockStoreTransaction implements Transaction public void setState(TransactionState state) { - _state = state; + _state = state; } public TransactionState getState() @@ -64,10 +64,10 @@ class MockStoreTransaction implements Transaction { if (_throwExceptionOnQueueOp) { - + throw new ServerScopedRuntimeException("Mocked exception"); } - + _numberOfEnqueuedMessages++; } @@ -87,7 +87,7 @@ class MockStoreTransaction implements Transaction { throw new ServerScopedRuntimeException("Mocked exception"); } - + _numberOfDequeuedMessages++; } @@ -124,12 +124,6 @@ class MockStoreTransaction implements Transaction storeTransaction.setState(TransactionState.STARTED); return storeTransaction; } - - @Override - public String getStoreType() - { - return "TEST"; - } }; } }
\ No newline at end of file diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java index fd56f3fa1c..1b131a18e1 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java @@ -44,7 +44,6 @@ import org.apache.qpid.server.security.SubjectCreator; import org.apache.qpid.server.stats.StatisticsGatherer; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.TestMemoryMessageStore; -import org.apache.qpid.server.store.TestableMemoryMessageStore; import org.apache.qpid.server.virtualhost.StandardVirtualHostFactory; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.plugin.PluggableFactoryLoader; @@ -110,7 +109,7 @@ public class BrokerTestHelper when(virtualHost.getAttribute(org.apache.qpid.server.model.VirtualHost.TYPE)).thenReturn(StandardVirtualHostFactory.TYPE); Map<String, Object> messageStoreSettings = new HashMap<String, Object>(); - messageStoreSettings.put(MessageStore.STORE_TYPE, TestableMemoryMessageStore.TYPE); + messageStoreSettings.put(MessageStore.STORE_TYPE, TestMemoryMessageStore.TYPE); when(virtualHost.getMessageStoreSettings()).thenReturn(messageStoreSettings); when(virtualHost.getName()).thenReturn(name); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MessageStoreRecovererTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MessageStoreRecovererTest.java new file mode 100644 index 0000000000..ce5616b9ca --- /dev/null +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MessageStoreRecovererTest.java @@ -0,0 +1,414 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhost; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.argThat; +import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.isNull; +import static org.mockito.Matchers.same; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.UUID; + +import junit.framework.TestCase; + +import org.apache.qpid.server.logging.EventLogger; +import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; +import org.apache.qpid.server.message.EnqueueableMessage; +import org.apache.qpid.server.message.MessageInstance; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.NullMessageStore; +import org.apache.qpid.server.store.StorableMessageMetaData; +import org.apache.qpid.server.store.StoreException; +import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.store.TestMessageMetaData; +import org.apache.qpid.server.store.Transaction; +import org.apache.qpid.server.store.Transaction.Record; +import org.apache.qpid.server.store.TransactionLogResource; +import org.apache.qpid.server.store.handler.DistributedTransactionHandler; +import org.apache.qpid.server.store.handler.MessageHandler; +import org.apache.qpid.server.store.handler.MessageInstanceHandler; +import org.apache.qpid.server.txn.DtxBranch; +import org.apache.qpid.server.txn.DtxRegistry; +import org.apache.qpid.server.util.Action; +import org.apache.qpid.transport.Xid; +import org.mockito.ArgumentMatcher; + +public class MessageStoreRecovererTest extends TestCase +{ + private VirtualHost _virtualHost; + + @Override + protected void setUp() throws Exception + { + super.setUp(); + + _virtualHost = mock(VirtualHost.class); + when(_virtualHost.getEventLogger()).thenReturn(new EventLogger()); + + } + + @SuppressWarnings("unchecked") + public void testRecoveryOfSingleMessageOnSingleQueue() + { + final AMQQueue<?> queue = createRegisteredMockQueue(); + + final long messageId = 1; + final StoredMessage<StorableMessageMetaData> storedMessage = createMockStoredMessage(messageId); + + MessageStore store = new NullMessageStore() + { + @Override + public void visitMessages(MessageHandler handler) throws StoreException + { + handler.handle(storedMessage); + } + + @Override + public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException + { + handler.handle(queue.getId(), messageId); + } + }; + + when(_virtualHost.getMessageStore()).thenReturn(store); + + MessageStoreRecoverer recoverer = new MessageStoreRecoverer(_virtualHost, mock(MessageStoreLogSubject.class)); + recoverer.recover(); + + ServerMessage<?> message = storedMessage.getMetaData().getType().createMessage(storedMessage); + verify(queue, times(1)).enqueue(eq(message), (Action<? super MessageInstance>)isNull()); + } + + @SuppressWarnings("unchecked") + public void testRecoveryOfMessageInstanceForNonExistingMessage() + { + final AMQQueue<?> queue = createRegisteredMockQueue(); + + final long messageId = 1; + final Transaction transaction = mock(Transaction.class); + + MessageStore store = new NullMessageStore() + { + @Override + public void visitMessages(MessageHandler handler) throws StoreException + { + // no message to visit + } + + @Override + public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException + { + handler.handle(queue.getId(), messageId); + } + + @Override + public Transaction newTransaction() + { + return transaction; + } + }; + + when(_virtualHost.getMessageStore()).thenReturn(store); + + MessageStoreRecoverer recoverer = new MessageStoreRecoverer(_virtualHost, mock(MessageStoreLogSubject.class)); + recoverer.recover(); + + verify(queue, never()).enqueue(any(ServerMessage.class), any(Action.class)); + verify(transaction).dequeueMessage(same(queue), argThat(new MessageNumberMatcher(messageId))); + verify(transaction, times(1)).commitTranAsync(); + } + + public void testRecoveryOfMessageInstanceForNonExistingQueue() + { + final UUID queueId = UUID.randomUUID(); + final Transaction transaction = mock(Transaction.class); + final long messageId = 1; + final StoredMessage<StorableMessageMetaData> storedMessage = createMockStoredMessage(messageId); + + MessageStore store = new NullMessageStore() + { + @Override + public void visitMessages(MessageHandler handler) throws StoreException + { + handler.handle(storedMessage); + } + + @Override + public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException + { + handler.handle(queueId, messageId); + } + + @Override + public Transaction newTransaction() + { + return transaction; + } + }; + + when(_virtualHost.getMessageStore()).thenReturn(store); + + MessageStoreRecoverer recoverer = new MessageStoreRecoverer(_virtualHost, mock(MessageStoreLogSubject.class)); + recoverer.recover(); + + verify(transaction).dequeueMessage(argThat(new QueueIdMatcher(queueId)), argThat(new MessageNumberMatcher(messageId))); + verify(transaction, times(1)).commitTranAsync(); + } + + public void testRecoveryDeletesOrphanMessages() + { + + final long messageId = 1; + final StoredMessage<StorableMessageMetaData> storedMessage = createMockStoredMessage(messageId); + + MessageStore store = new NullMessageStore() + { + @Override + public void visitMessages(MessageHandler handler) throws StoreException + { + handler.handle(storedMessage); + } + + @Override + public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException + { + // No messages instances + } + }; + + when(_virtualHost.getMessageStore()).thenReturn(store); + + MessageStoreRecoverer recoverer = new MessageStoreRecoverer(_virtualHost, mock(MessageStoreLogSubject.class)); + recoverer.recover(); + + verify(storedMessage, times(1)).remove(); + } + + @SuppressWarnings("unchecked") + public void testRecoveryOfSingleEnqueueWithDistributedTransaction() + { + AMQQueue<?> queue = createRegisteredMockQueue(); + + final Transaction transaction = mock(Transaction.class); + + final StoredMessage<StorableMessageMetaData> storedMessage = createMockStoredMessage(1); + long messageId = storedMessage.getMessageNumber(); + + EnqueueableMessage enqueueableMessage = createMockEnqueueableMessage(messageId, storedMessage); + Record enqueueRecord = createMockRecord(queue, enqueueableMessage); + + final long format = 1; + final byte[] globalId = new byte[] {0}; + final byte[] branchId = new byte[] {0}; + final Record[] enqueues = { enqueueRecord }; + final Record[] dequeues = {}; + + MessageStore store = new NullMessageStore() + { + @Override + public void visitMessages(MessageHandler handler) throws StoreException + { + handler.handle(storedMessage); + } + + @Override + public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException + { + // No messages instances + } + + @Override + public void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException + { + handler.handle(format, globalId, branchId, enqueues, dequeues); + } + + @Override + public Transaction newTransaction() + { + return transaction; + } + }; + + DtxRegistry dtxRegistry = new DtxRegistry(); + + when(_virtualHost.getMessageStore()).thenReturn(store); + when(_virtualHost.getDtxRegistry()).thenReturn(dtxRegistry); + + MessageStoreRecoverer recoverer = new MessageStoreRecoverer(_virtualHost, mock(MessageStoreLogSubject.class)); + recoverer.recover(); + + DtxBranch branch = dtxRegistry.getBranch(new Xid(format, globalId, branchId)); + assertNotNull("Expected dtx branch to be created", branch); + branch.commit(); + + ServerMessage<?> message = storedMessage.getMetaData().getType().createMessage(storedMessage); + verify(queue, times(1)).enqueue(eq(message), (Action<? super MessageInstance>)isNull()); + verify(transaction).commitTran(); + } + + public void testRecoveryOfSingleDequeueWithDistributedTransaction() + { + final AMQQueue<?> queue = createRegisteredMockQueue(); + + + final Transaction transaction = mock(Transaction.class); + + final StoredMessage<StorableMessageMetaData> storedMessage = createMockStoredMessage(1); + final long messageId = storedMessage.getMessageNumber(); + + EnqueueableMessage enqueueableMessage = createMockEnqueueableMessage(messageId, storedMessage); + Record dequeueRecord = createMockRecord(queue, enqueueableMessage); + + QueueEntry queueEntry = mock(QueueEntry.class); + when(queue.getMessageOnTheQueue(messageId)).thenReturn(queueEntry); + + final long format = 1; + final byte[] globalId = new byte[] {0}; + final byte[] branchId = new byte[] {0}; + final Record[] enqueues = {}; + final Record[] dequeues = { dequeueRecord }; + + MessageStore store = new NullMessageStore() + { + @Override + public void visitMessages(MessageHandler handler) throws StoreException + { + handler.handle(storedMessage); + } + + @Override + public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException + { + // We need the message to be enqueued onto the queue so that later the distributed transaction + // can dequeue it. + handler.handle(queue.getId(), messageId); + } + + @Override + public void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException + { + handler.handle(format, globalId, branchId, enqueues, dequeues); + } + + @Override + public Transaction newTransaction() + { + return transaction; + } + }; + + DtxRegistry dtxRegistry = new DtxRegistry(); + + when(_virtualHost.getMessageStore()).thenReturn(store); + when(_virtualHost.getDtxRegistry()).thenReturn(dtxRegistry); + + MessageStoreRecoverer recoverer = new MessageStoreRecoverer(_virtualHost, mock(MessageStoreLogSubject.class)); + recoverer.recover(); + + DtxBranch branch = dtxRegistry.getBranch(new Xid(format, globalId, branchId)); + assertNotNull("Expected dtx branch to be created", branch); + branch.commit(); + + verify(queueEntry, times(1)).delete(); + verify(transaction).commitTran(); + } + + + protected Record createMockRecord(AMQQueue<?> queue, EnqueueableMessage enqueueableMessage) + { + Record enqueueRecord = mock(Record.class); + when(enqueueRecord.getMessage()).thenReturn(enqueueableMessage); + when(enqueueRecord.getResource()).thenReturn(queue); + return enqueueRecord; + } + + protected EnqueueableMessage createMockEnqueueableMessage(long messageId, + final StoredMessage<StorableMessageMetaData> storedMessage) + { + EnqueueableMessage enqueueableMessage = mock(EnqueueableMessage.class); + when(enqueueableMessage.getMessageNumber()).thenReturn(messageId); + when(enqueueableMessage.getStoredMessage()).thenReturn(storedMessage); + return enqueueableMessage; + } + + private StoredMessage<StorableMessageMetaData> createMockStoredMessage(final long messageId) + { + TestMessageMetaData metaData = new TestMessageMetaData(messageId, 0); + + @SuppressWarnings("unchecked") + final StoredMessage<StorableMessageMetaData> storedMessage = mock(StoredMessage.class); + when(storedMessage.getMessageNumber()).thenReturn(messageId); + when(storedMessage.getMetaData()).thenReturn(metaData); + return storedMessage; + } + + private AMQQueue<?> createRegisteredMockQueue() + { + AMQQueue<?> queue = mock(AMQQueue.class); + final UUID queueId = UUID.randomUUID(); + when(queue.getId()).thenReturn(queueId); + when(queue.getName()).thenReturn("test-queue"); + when(_virtualHost.getQueue(queueId)).thenReturn(queue); + return queue; + } + + + private final class QueueIdMatcher extends ArgumentMatcher<TransactionLogResource> + { + private UUID _queueId; + public QueueIdMatcher(UUID queueId) + { + _queueId = queueId; + } + + @Override + public boolean matches(Object argument) + { + return argument instanceof TransactionLogResource && _queueId.equals( ((TransactionLogResource)argument).getId() ); + } + } + + private final class MessageNumberMatcher extends ArgumentMatcher<EnqueueableMessage> + { + private final long _messageId; + + private MessageNumberMatcher(long messageId) + { + _messageId = messageId; + } + + @Override + public boolean matches(Object argument) + { + return argument instanceof EnqueueableMessage && ((EnqueueableMessage)argument).getMessageNumber() == _messageId; + } + } +} |