diff options
Diffstat (limited to 'qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java')
-rw-r--r-- | qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java | 404 |
1 files changed, 327 insertions, 77 deletions
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java index 51d3fc15d2..8bf981bd7b 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java @@ -21,29 +21,31 @@ package org.apache.qpid.server.store; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.argThat; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; -import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import java.util.UUID; +import java.util.concurrent.atomic.AtomicReference; import org.apache.qpid.server.message.EnqueueableMessage; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.UUIDGenerator; -import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler; import org.apache.qpid.server.store.Transaction.Record; +import org.apache.qpid.server.store.handler.DistributedTransactionHandler; +import org.apache.qpid.server.store.handler.MessageHandler; +import org.apache.qpid.server.store.handler.MessageInstanceHandler; import org.apache.qpid.test.utils.QpidTestCase; +import org.mockito.ArgumentMatcher; public abstract class MessageStoreTestCase extends QpidTestCase { - private MessageStoreRecoveryHandler _messageStoreRecoveryHandler; - private StoredMessageRecoveryHandler _storedMessageRecoveryHandler; - private TransactionLogRecoveryHandler _logRecoveryHandler; - private TransactionLogRecoveryHandler.QueueEntryRecoveryHandler _queueEntryRecoveryHandler; - private TransactionLogRecoveryHandler.DtxRecordRecoveryHandler _dtxRecordRecoveryHandler; - private MessageStore _store; private Map<String, Object> _storeSettings; private ConfiguredObject<?> _parent; @@ -55,35 +57,34 @@ public abstract class MessageStoreTestCase extends QpidTestCase _parent = mock(ConfiguredObject.class); when(_parent.getName()).thenReturn("test"); - _storedMessageRecoveryHandler = mock(StoredMessageRecoveryHandler.class); - _logRecoveryHandler = mock(TransactionLogRecoveryHandler.class); - _messageStoreRecoveryHandler = mock(MessageStoreRecoveryHandler.class); - _queueEntryRecoveryHandler = mock(TransactionLogRecoveryHandler.QueueEntryRecoveryHandler.class); - _dtxRecordRecoveryHandler = mock(TransactionLogRecoveryHandler.DtxRecordRecoveryHandler.class); - - when(_messageStoreRecoveryHandler.begin()).thenReturn(_storedMessageRecoveryHandler); - when(_logRecoveryHandler.begin(any(MessageStore.class))).thenReturn(_queueEntryRecoveryHandler); - when(_queueEntryRecoveryHandler.completeQueueEntryRecovery()).thenReturn(_dtxRecordRecoveryHandler); - _storeSettings = getStoreSettings(); _store = createMessageStore(); _store.openMessageStore(_parent, _storeSettings); - _store.recoverMessageStore(_messageStoreRecoveryHandler, _logRecoveryHandler); + } protected abstract Map<String, Object> getStoreSettings() throws Exception; protected abstract MessageStore createMessageStore(); - public MessageStore getStore() + protected MessageStore getStore() { return _store; } - public void testRecordXid() throws Exception + protected void reopenStore() throws Exception { + _store.closeMessageStore(); + + _store = createMessageStore(); + _store.openMessageStore(_parent, _storeSettings); + } + + public void testAddAndRemoveRecordXid() throws Exception + { + long format = 1l; Record enqueueRecord = getTestRecord(1); Record dequeueRecord = getTestRecord(2); Record[] enqueues = { enqueueRecord }; @@ -92,27 +93,287 @@ public abstract class MessageStoreTestCase extends QpidTestCase byte[] branchId = new byte[] { 2 }; Transaction transaction = _store.newTransaction(); - transaction.recordXid(1l, globalId, branchId, enqueues, dequeues); + transaction.recordXid(format, globalId, branchId, enqueues, dequeues); transaction.commitTran(); + reopenStore(); - verify(_dtxRecordRecoveryHandler).dtxRecord(1l, globalId, branchId, enqueues, dequeues); + + DistributedTransactionHandler handler = mock(DistributedTransactionHandler.class); + _store.visitDistributedTransactions(handler); + verify(handler, times(1)).handle(format,globalId, branchId, enqueues, dequeues); transaction = _store.newTransaction(); transaction.removeXid(1l, globalId, branchId); transaction.commitTran(); reopenStore(); - verify(_dtxRecordRecoveryHandler, times(1)).dtxRecord(1l, globalId, branchId, enqueues, dequeues); + + handler = mock(DistributedTransactionHandler.class); + _store.visitDistributedTransactions(handler); + verify(handler, never()).handle(format,globalId, branchId, enqueues, dequeues); } - private void reopenStore() throws Exception + public void testVisitMessages() throws Exception { - _store.closeMessageStore(); + long messageId = 1; + int contentSize = 0; + final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(messageId, contentSize)); + StoreFuture flushFuture = message.flushToStore(); + flushFuture.waitForCompletion(); - _store = createMessageStore(); - _store.openMessageStore(_parent, _storeSettings); - _store.recoverMessageStore(_messageStoreRecoveryHandler, _logRecoveryHandler); + MessageHandler handler = mock(MessageHandler.class); + _store.visitMessages(handler); + + verify(handler, times(1)).handle(argThat(new MessageMetaDataMatcher(messageId))); + + } + + public void testVisitMessagesAborted() throws Exception + { + int contentSize = 0; + for (int i = 0; i < 3; i++) + { + final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(i + 1, contentSize)); + StoreFuture flushFuture = message.flushToStore(); + flushFuture.waitForCompletion(); + } + + MessageHandler handler = mock(MessageHandler.class); + when(handler.handle(any(StoredMessage.class))).thenReturn(true, false); + + _store.visitMessages(handler); + + verify(handler, times(2)).handle(any(StoredMessage.class)); + } + + public void testReopenedMessageStoreUsesLastMessageId() throws Exception + { + int contentSize = 0; + for (int i = 0; i < 3; i++) + { + final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(i + 1, contentSize)); + StoreFuture flushFuture = message.flushToStore(); + flushFuture.waitForCompletion(); + } + + reopenStore(); + + final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(4, contentSize)); + + StoreFuture flushFuture = message.flushToStore(); + flushFuture.waitForCompletion(); + + assertEquals("Unexpected message id", 4, message.getMessageNumber()); + } + + public void testVisitMessageInstances() throws Exception + { + long messageId = 1; + int contentSize = 0; + final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(messageId, contentSize)); + StoreFuture flushFuture = message.flushToStore(); + flushFuture.waitForCompletion(); + + EnqueueableMessage enqueueableMessage = createMockEnqueueableMessage(messageId, message); + + UUID queueId = UUID.randomUUID(); + TransactionLogResource queue = createTransactionLogResource(queueId); + + Transaction transaction = _store.newTransaction(); + transaction.enqueueMessage(queue, enqueueableMessage); + transaction.commitTran(); + + MessageInstanceHandler handler = mock(MessageInstanceHandler.class); + _store.visitMessageInstances(handler); + + verify(handler, times(1)).handle(queueId, messageId); + } + + public void testVisitDistributedTransactions() throws Exception + { + long format = 1l; + byte[] branchId = new byte[] { 2 }; + byte[] globalId = new byte[] { 1 }; + Record enqueueRecord = getTestRecord(1); + Record dequeueRecord = getTestRecord(2); + Record[] enqueues = { enqueueRecord }; + Record[] dequeues = { dequeueRecord }; + + Transaction transaction = _store.newTransaction(); + transaction.recordXid(format, globalId, branchId, enqueues, dequeues); + transaction.commitTran(); + + DistributedTransactionHandler handler = mock(DistributedTransactionHandler.class); + _store.visitDistributedTransactions(handler); + + verify(handler, times(1)).handle(format,globalId, branchId, enqueues, dequeues); + + } + + public void testCommitTransaction() throws Exception + { + final UUID mockQueueId = UUIDGenerator.generateRandomUUID(); + TransactionLogResource mockQueue = createTransactionLogResource(mockQueueId); + + Transaction txn = getStore().newTransaction(); + + long messageId1 = 1L; + long messageId2 = 5L; + final EnqueueableMessage enqueueableMessage1 = createEnqueueableMessage(messageId1); + final EnqueueableMessage enqueueableMessage2 = createEnqueueableMessage(messageId2); + + txn.enqueueMessage(mockQueue, enqueueableMessage1); + txn.enqueueMessage(mockQueue, enqueueableMessage2); + txn.commitTran(); + + QueueFilteringMessageInstanceHandler filter = new QueueFilteringMessageInstanceHandler(mockQueueId); + getStore().visitMessageInstances(filter); + Set<Long> enqueuedIds = filter.getEnqueuedIds(); + + assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size()); + assertTrue("Message with id " + messageId1 + " is not found", enqueuedIds.contains(messageId1)); + assertTrue("Message with id " + messageId2 + " is not found", enqueuedIds.contains(messageId2)); } + + public void testRollbackTransactionBeforeCommit() throws Exception + { + final UUID mockQueueId = UUIDGenerator.generateRandomUUID(); + TransactionLogResource mockQueue = createTransactionLogResource(mockQueueId); + + long messageId1 = 21L; + long messageId2 = 22L; + long messageId3 = 23L; + final EnqueueableMessage enqueueableMessage1 = createEnqueueableMessage(messageId1); + final EnqueueableMessage enqueueableMessage2 = createEnqueueableMessage(messageId2); + final EnqueueableMessage enqueueableMessage3 = createEnqueueableMessage(messageId3); + + Transaction txn = getStore().newTransaction(); + + txn.enqueueMessage(mockQueue, enqueueableMessage1); + txn.abortTran(); + + txn = getStore().newTransaction(); + txn.enqueueMessage(mockQueue, enqueueableMessage2); + txn.enqueueMessage(mockQueue, enqueueableMessage3); + txn.commitTran(); + + QueueFilteringMessageInstanceHandler filter = new QueueFilteringMessageInstanceHandler(mockQueueId); + getStore().visitMessageInstances(filter); + Set<Long> enqueuedIds = filter.getEnqueuedIds(); + + assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size()); + assertTrue("Message with id " + messageId2 + " is not found", enqueuedIds.contains(messageId2)); + assertTrue("Message with id " + messageId3 + " is not found", enqueuedIds.contains(messageId3)); + } + + public void testRollbackTransactionAfterCommit() throws Exception + { + final UUID mockQueueId = UUIDGenerator.generateRandomUUID(); + TransactionLogResource mockQueue = createTransactionLogResource(mockQueueId); + + long messageId1 = 30L; + long messageId2 = 31L; + long messageId3 = 32L; + + final EnqueueableMessage enqueueableMessage1 = createEnqueueableMessage(messageId1); + final EnqueueableMessage enqueueableMessage2 = createEnqueueableMessage(messageId2); + final EnqueueableMessage enqueueableMessage3 = createEnqueueableMessage(messageId3); + + Transaction txn = getStore().newTransaction(); + + txn.enqueueMessage(mockQueue, enqueueableMessage1); + txn.commitTran(); + + txn = getStore().newTransaction(); + txn.enqueueMessage(mockQueue, enqueueableMessage2); + txn.abortTran(); + + txn = getStore().newTransaction(); + txn.enqueueMessage(mockQueue, enqueueableMessage3); + txn.commitTran(); + + QueueFilteringMessageInstanceHandler filter = new QueueFilteringMessageInstanceHandler(mockQueueId); + getStore().visitMessageInstances(filter); + Set<Long> enqueuedIds = filter.getEnqueuedIds(); + + assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size()); + assertTrue("Message with id " + messageId1 + " is not found", enqueuedIds.contains(messageId1)); + assertTrue("Message with id " + messageId3 + " is not found", enqueuedIds.contains(messageId3)); + } + + public void testStoreIgnoresTransientMessage() throws Exception + { + long messageId = 1; + int contentSize = 0; + final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(messageId, contentSize, false)); + StoreFuture flushFuture = message.flushToStore(); + flushFuture.waitForCompletion(); + + MessageHandler handler = mock(MessageHandler.class); + _store.visitMessages(handler); + + verify(handler, times(0)).handle(argThat(new MessageMetaDataMatcher(messageId))); + } + + public void testAddAndRemoveMessageWithoutContent() throws Exception + { + long messageId = 1; + int contentSize = 0; + final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(messageId, contentSize)); + StoreFuture flushFuture = message.flushToStore(); + flushFuture.waitForCompletion(); + + final AtomicReference<StoredMessage<?>> retrievedMessageRef = new AtomicReference<StoredMessage<?>>(); + _store.visitMessages(new MessageHandler() + { + + @Override + public boolean handle(StoredMessage<?> storedMessage) + { + retrievedMessageRef.set(storedMessage); + return true; + } + }); + + StoredMessage<?> retrievedMessage = retrievedMessageRef.get(); + assertNotNull("Message was not found", retrievedMessageRef); + assertEquals("Unexpected retreived message", message.getMessageNumber(), retrievedMessage.getMessageNumber()); + + retrievedMessage.remove(); + + retrievedMessageRef.set(null); + _store.visitMessages(new MessageHandler() + { + + @Override + public boolean handle(StoredMessage<?> storedMessage) + { + retrievedMessageRef.set(storedMessage); + return true; + } + }); + assertNull(retrievedMessageRef.get()); + } + + + private TransactionLogResource createTransactionLogResource(UUID queueId) + { + TransactionLogResource queue = mock(TransactionLogResource.class); + when(queue.getId()).thenReturn(queueId); + when(queue.getName()).thenReturn("testQueue"); + when(queue.isDurable()).thenReturn(true); + return queue; + } + + private EnqueueableMessage createMockEnqueueableMessage(long messageId, final StoredMessage<TestMessageMetaData> message) + { + EnqueueableMessage enqueueableMessage = mock(EnqueueableMessage.class); + when(enqueueableMessage.isPersistent()).thenReturn(true); + when(enqueueableMessage.getMessageNumber()).thenReturn(messageId); + when(enqueueableMessage.getStoredMessage()).thenReturn(message); + return enqueueableMessage; + } + private Record getTestRecord(long messageNumber) { UUID queueId1 = UUIDGenerator.generateRandomUUID(); @@ -121,77 +382,66 @@ public abstract class MessageStoreTestCase extends QpidTestCase EnqueueableMessage message1 = mock(EnqueueableMessage.class); when(message1.isPersistent()).thenReturn(true); when(message1.getMessageNumber()).thenReturn(messageNumber); - final StoredMessage storedMessage = mock(StoredMessage.class); + final StoredMessage<?> storedMessage = mock(StoredMessage.class); when(storedMessage.getMessageNumber()).thenReturn(messageNumber); when(message1.getStoredMessage()).thenReturn(storedMessage); Record enqueueRecord = new TestRecord(queue1, message1); return enqueueRecord; } - private static class TestRecord implements Record + private EnqueueableMessage createEnqueueableMessage(long messageId1) { - private TransactionLogResource _queue; - private EnqueueableMessage _message; + final StoredMessage<TestMessageMetaData> message1 = _store.addMessage(new TestMessageMetaData(messageId1, 0)); + StoreFuture flushFuture = message1.flushToStore(); + flushFuture.waitForCompletion(); + EnqueueableMessage enqueueableMessage1 = createMockEnqueueableMessage(messageId1, message1); + return enqueueableMessage1; + } - public TestRecord(TransactionLogResource queue, EnqueueableMessage message) + private class MessageMetaDataMatcher extends ArgumentMatcher<StoredMessage<?>> + { + private long _messageNumber; + + public MessageMetaDataMatcher(long messageNumber) { super(); - _queue = queue; - _message = message; + _messageNumber = messageNumber; } - @Override - public TransactionLogResource getResource() + public boolean matches(Object obj) { - return _queue; + return obj instanceof StoredMessage && ((StoredMessage<?>)obj).getMessageNumber() == _messageNumber; } + } - @Override - public EnqueueableMessage getMessage() - { - return _message; - } + private class QueueFilteringMessageInstanceHandler implements MessageInstanceHandler + { + private final UUID _queueId; + private final Set<Long> _enqueuedIds = new HashSet<Long>(); - @Override - public int hashCode() + public QueueFilteringMessageInstanceHandler(UUID queueId) { - final int prime = 31; - int result = 1; - result = prime * result + ((_message == null) ? 0 : new Long(_message.getMessageNumber()).hashCode()); - result = prime * result + ((_queue == null) ? 0 : _queue.getId().hashCode()); - return result; + _queueId = queueId; } @Override - public boolean equals(Object obj) + public boolean handle(UUID queueId, long messageId) { - if (this == obj) - { - return true; - } - if (obj == null) - { - return false; - } - if (!(obj instanceof Record)) + if (queueId.equals(_queueId)) { - return false; + if (_enqueuedIds.contains(messageId)) + { + fail("Queue with id " + _queueId + " contains duplicate message ids"); + } + _enqueuedIds.add(messageId); } - Record other = (Record) obj; - if (_message == null && other.getMessage() != null) - { - return false; - } - if (_queue == null && other.getResource() != null) - { - return false; - } - if (_message.getMessageNumber() != other.getMessage().getMessageNumber()) - { - return false; - } - return _queue.getId().equals(other.getResource().getId()); + return true; } + public Set<Long> getEnqueuedIds() + { + return _enqueuedIds; + } } + } |