summaryrefslogtreecommitdiff
path: root/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java')
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java404
1 files changed, 327 insertions, 77 deletions
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
index 51d3fc15d2..8bf981bd7b 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
@@ -21,29 +21,31 @@
package org.apache.qpid.server.store;
import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.UUIDGenerator;
-import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler;
import org.apache.qpid.server.store.Transaction.Record;
+import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
+import org.apache.qpid.server.store.handler.MessageHandler;
+import org.apache.qpid.server.store.handler.MessageInstanceHandler;
import org.apache.qpid.test.utils.QpidTestCase;
+import org.mockito.ArgumentMatcher;
public abstract class MessageStoreTestCase extends QpidTestCase
{
- private MessageStoreRecoveryHandler _messageStoreRecoveryHandler;
- private StoredMessageRecoveryHandler _storedMessageRecoveryHandler;
- private TransactionLogRecoveryHandler _logRecoveryHandler;
- private TransactionLogRecoveryHandler.QueueEntryRecoveryHandler _queueEntryRecoveryHandler;
- private TransactionLogRecoveryHandler.DtxRecordRecoveryHandler _dtxRecordRecoveryHandler;
-
private MessageStore _store;
private Map<String, Object> _storeSettings;
private ConfiguredObject<?> _parent;
@@ -55,35 +57,34 @@ public abstract class MessageStoreTestCase extends QpidTestCase
_parent = mock(ConfiguredObject.class);
when(_parent.getName()).thenReturn("test");
- _storedMessageRecoveryHandler = mock(StoredMessageRecoveryHandler.class);
- _logRecoveryHandler = mock(TransactionLogRecoveryHandler.class);
- _messageStoreRecoveryHandler = mock(MessageStoreRecoveryHandler.class);
- _queueEntryRecoveryHandler = mock(TransactionLogRecoveryHandler.QueueEntryRecoveryHandler.class);
- _dtxRecordRecoveryHandler = mock(TransactionLogRecoveryHandler.DtxRecordRecoveryHandler.class);
-
- when(_messageStoreRecoveryHandler.begin()).thenReturn(_storedMessageRecoveryHandler);
- when(_logRecoveryHandler.begin(any(MessageStore.class))).thenReturn(_queueEntryRecoveryHandler);
- when(_queueEntryRecoveryHandler.completeQueueEntryRecovery()).thenReturn(_dtxRecordRecoveryHandler);
-
_storeSettings = getStoreSettings();
_store = createMessageStore();
_store.openMessageStore(_parent, _storeSettings);
- _store.recoverMessageStore(_messageStoreRecoveryHandler, _logRecoveryHandler);
+
}
protected abstract Map<String, Object> getStoreSettings() throws Exception;
protected abstract MessageStore createMessageStore();
- public MessageStore getStore()
+ protected MessageStore getStore()
{
return _store;
}
- public void testRecordXid() throws Exception
+ protected void reopenStore() throws Exception
{
+ _store.closeMessageStore();
+
+ _store = createMessageStore();
+ _store.openMessageStore(_parent, _storeSettings);
+ }
+
+ public void testAddAndRemoveRecordXid() throws Exception
+ {
+ long format = 1l;
Record enqueueRecord = getTestRecord(1);
Record dequeueRecord = getTestRecord(2);
Record[] enqueues = { enqueueRecord };
@@ -92,27 +93,287 @@ public abstract class MessageStoreTestCase extends QpidTestCase
byte[] branchId = new byte[] { 2 };
Transaction transaction = _store.newTransaction();
- transaction.recordXid(1l, globalId, branchId, enqueues, dequeues);
+ transaction.recordXid(format, globalId, branchId, enqueues, dequeues);
transaction.commitTran();
+
reopenStore();
- verify(_dtxRecordRecoveryHandler).dtxRecord(1l, globalId, branchId, enqueues, dequeues);
+
+ DistributedTransactionHandler handler = mock(DistributedTransactionHandler.class);
+ _store.visitDistributedTransactions(handler);
+ verify(handler, times(1)).handle(format,globalId, branchId, enqueues, dequeues);
transaction = _store.newTransaction();
transaction.removeXid(1l, globalId, branchId);
transaction.commitTran();
reopenStore();
- verify(_dtxRecordRecoveryHandler, times(1)).dtxRecord(1l, globalId, branchId, enqueues, dequeues);
+
+ handler = mock(DistributedTransactionHandler.class);
+ _store.visitDistributedTransactions(handler);
+ verify(handler, never()).handle(format,globalId, branchId, enqueues, dequeues);
}
- private void reopenStore() throws Exception
+ public void testVisitMessages() throws Exception
{
- _store.closeMessageStore();
+ long messageId = 1;
+ int contentSize = 0;
+ final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(messageId, contentSize));
+ StoreFuture flushFuture = message.flushToStore();
+ flushFuture.waitForCompletion();
- _store = createMessageStore();
- _store.openMessageStore(_parent, _storeSettings);
- _store.recoverMessageStore(_messageStoreRecoveryHandler, _logRecoveryHandler);
+ MessageHandler handler = mock(MessageHandler.class);
+ _store.visitMessages(handler);
+
+ verify(handler, times(1)).handle(argThat(new MessageMetaDataMatcher(messageId)));
+
+ }
+
+ public void testVisitMessagesAborted() throws Exception
+ {
+ int contentSize = 0;
+ for (int i = 0; i < 3; i++)
+ {
+ final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(i + 1, contentSize));
+ StoreFuture flushFuture = message.flushToStore();
+ flushFuture.waitForCompletion();
+ }
+
+ MessageHandler handler = mock(MessageHandler.class);
+ when(handler.handle(any(StoredMessage.class))).thenReturn(true, false);
+
+ _store.visitMessages(handler);
+
+ verify(handler, times(2)).handle(any(StoredMessage.class));
+ }
+
+ public void testReopenedMessageStoreUsesLastMessageId() throws Exception
+ {
+ int contentSize = 0;
+ for (int i = 0; i < 3; i++)
+ {
+ final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(i + 1, contentSize));
+ StoreFuture flushFuture = message.flushToStore();
+ flushFuture.waitForCompletion();
+ }
+
+ reopenStore();
+
+ final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(4, contentSize));
+
+ StoreFuture flushFuture = message.flushToStore();
+ flushFuture.waitForCompletion();
+
+ assertEquals("Unexpected message id", 4, message.getMessageNumber());
+ }
+
+ public void testVisitMessageInstances() throws Exception
+ {
+ long messageId = 1;
+ int contentSize = 0;
+ final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(messageId, contentSize));
+ StoreFuture flushFuture = message.flushToStore();
+ flushFuture.waitForCompletion();
+
+ EnqueueableMessage enqueueableMessage = createMockEnqueueableMessage(messageId, message);
+
+ UUID queueId = UUID.randomUUID();
+ TransactionLogResource queue = createTransactionLogResource(queueId);
+
+ Transaction transaction = _store.newTransaction();
+ transaction.enqueueMessage(queue, enqueueableMessage);
+ transaction.commitTran();
+
+ MessageInstanceHandler handler = mock(MessageInstanceHandler.class);
+ _store.visitMessageInstances(handler);
+
+ verify(handler, times(1)).handle(queueId, messageId);
+ }
+
+ public void testVisitDistributedTransactions() throws Exception
+ {
+ long format = 1l;
+ byte[] branchId = new byte[] { 2 };
+ byte[] globalId = new byte[] { 1 };
+ Record enqueueRecord = getTestRecord(1);
+ Record dequeueRecord = getTestRecord(2);
+ Record[] enqueues = { enqueueRecord };
+ Record[] dequeues = { dequeueRecord };
+
+ Transaction transaction = _store.newTransaction();
+ transaction.recordXid(format, globalId, branchId, enqueues, dequeues);
+ transaction.commitTran();
+
+ DistributedTransactionHandler handler = mock(DistributedTransactionHandler.class);
+ _store.visitDistributedTransactions(handler);
+
+ verify(handler, times(1)).handle(format,globalId, branchId, enqueues, dequeues);
+
+ }
+
+ public void testCommitTransaction() throws Exception
+ {
+ final UUID mockQueueId = UUIDGenerator.generateRandomUUID();
+ TransactionLogResource mockQueue = createTransactionLogResource(mockQueueId);
+
+ Transaction txn = getStore().newTransaction();
+
+ long messageId1 = 1L;
+ long messageId2 = 5L;
+ final EnqueueableMessage enqueueableMessage1 = createEnqueueableMessage(messageId1);
+ final EnqueueableMessage enqueueableMessage2 = createEnqueueableMessage(messageId2);
+
+ txn.enqueueMessage(mockQueue, enqueueableMessage1);
+ txn.enqueueMessage(mockQueue, enqueueableMessage2);
+ txn.commitTran();
+
+ QueueFilteringMessageInstanceHandler filter = new QueueFilteringMessageInstanceHandler(mockQueueId);
+ getStore().visitMessageInstances(filter);
+ Set<Long> enqueuedIds = filter.getEnqueuedIds();
+
+ assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size());
+ assertTrue("Message with id " + messageId1 + " is not found", enqueuedIds.contains(messageId1));
+ assertTrue("Message with id " + messageId2 + " is not found", enqueuedIds.contains(messageId2));
}
+
+ public void testRollbackTransactionBeforeCommit() throws Exception
+ {
+ final UUID mockQueueId = UUIDGenerator.generateRandomUUID();
+ TransactionLogResource mockQueue = createTransactionLogResource(mockQueueId);
+
+ long messageId1 = 21L;
+ long messageId2 = 22L;
+ long messageId3 = 23L;
+ final EnqueueableMessage enqueueableMessage1 = createEnqueueableMessage(messageId1);
+ final EnqueueableMessage enqueueableMessage2 = createEnqueueableMessage(messageId2);
+ final EnqueueableMessage enqueueableMessage3 = createEnqueueableMessage(messageId3);
+
+ Transaction txn = getStore().newTransaction();
+
+ txn.enqueueMessage(mockQueue, enqueueableMessage1);
+ txn.abortTran();
+
+ txn = getStore().newTransaction();
+ txn.enqueueMessage(mockQueue, enqueueableMessage2);
+ txn.enqueueMessage(mockQueue, enqueueableMessage3);
+ txn.commitTran();
+
+ QueueFilteringMessageInstanceHandler filter = new QueueFilteringMessageInstanceHandler(mockQueueId);
+ getStore().visitMessageInstances(filter);
+ Set<Long> enqueuedIds = filter.getEnqueuedIds();
+
+ assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size());
+ assertTrue("Message with id " + messageId2 + " is not found", enqueuedIds.contains(messageId2));
+ assertTrue("Message with id " + messageId3 + " is not found", enqueuedIds.contains(messageId3));
+ }
+
+ public void testRollbackTransactionAfterCommit() throws Exception
+ {
+ final UUID mockQueueId = UUIDGenerator.generateRandomUUID();
+ TransactionLogResource mockQueue = createTransactionLogResource(mockQueueId);
+
+ long messageId1 = 30L;
+ long messageId2 = 31L;
+ long messageId3 = 32L;
+
+ final EnqueueableMessage enqueueableMessage1 = createEnqueueableMessage(messageId1);
+ final EnqueueableMessage enqueueableMessage2 = createEnqueueableMessage(messageId2);
+ final EnqueueableMessage enqueueableMessage3 = createEnqueueableMessage(messageId3);
+
+ Transaction txn = getStore().newTransaction();
+
+ txn.enqueueMessage(mockQueue, enqueueableMessage1);
+ txn.commitTran();
+
+ txn = getStore().newTransaction();
+ txn.enqueueMessage(mockQueue, enqueueableMessage2);
+ txn.abortTran();
+
+ txn = getStore().newTransaction();
+ txn.enqueueMessage(mockQueue, enqueueableMessage3);
+ txn.commitTran();
+
+ QueueFilteringMessageInstanceHandler filter = new QueueFilteringMessageInstanceHandler(mockQueueId);
+ getStore().visitMessageInstances(filter);
+ Set<Long> enqueuedIds = filter.getEnqueuedIds();
+
+ assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size());
+ assertTrue("Message with id " + messageId1 + " is not found", enqueuedIds.contains(messageId1));
+ assertTrue("Message with id " + messageId3 + " is not found", enqueuedIds.contains(messageId3));
+ }
+
+ public void testStoreIgnoresTransientMessage() throws Exception
+ {
+ long messageId = 1;
+ int contentSize = 0;
+ final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(messageId, contentSize, false));
+ StoreFuture flushFuture = message.flushToStore();
+ flushFuture.waitForCompletion();
+
+ MessageHandler handler = mock(MessageHandler.class);
+ _store.visitMessages(handler);
+
+ verify(handler, times(0)).handle(argThat(new MessageMetaDataMatcher(messageId)));
+ }
+
+ public void testAddAndRemoveMessageWithoutContent() throws Exception
+ {
+ long messageId = 1;
+ int contentSize = 0;
+ final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(messageId, contentSize));
+ StoreFuture flushFuture = message.flushToStore();
+ flushFuture.waitForCompletion();
+
+ final AtomicReference<StoredMessage<?>> retrievedMessageRef = new AtomicReference<StoredMessage<?>>();
+ _store.visitMessages(new MessageHandler()
+ {
+
+ @Override
+ public boolean handle(StoredMessage<?> storedMessage)
+ {
+ retrievedMessageRef.set(storedMessage);
+ return true;
+ }
+ });
+
+ StoredMessage<?> retrievedMessage = retrievedMessageRef.get();
+ assertNotNull("Message was not found", retrievedMessageRef);
+ assertEquals("Unexpected retreived message", message.getMessageNumber(), retrievedMessage.getMessageNumber());
+
+ retrievedMessage.remove();
+
+ retrievedMessageRef.set(null);
+ _store.visitMessages(new MessageHandler()
+ {
+
+ @Override
+ public boolean handle(StoredMessage<?> storedMessage)
+ {
+ retrievedMessageRef.set(storedMessage);
+ return true;
+ }
+ });
+ assertNull(retrievedMessageRef.get());
+ }
+
+
+ private TransactionLogResource createTransactionLogResource(UUID queueId)
+ {
+ TransactionLogResource queue = mock(TransactionLogResource.class);
+ when(queue.getId()).thenReturn(queueId);
+ when(queue.getName()).thenReturn("testQueue");
+ when(queue.isDurable()).thenReturn(true);
+ return queue;
+ }
+
+ private EnqueueableMessage createMockEnqueueableMessage(long messageId, final StoredMessage<TestMessageMetaData> message)
+ {
+ EnqueueableMessage enqueueableMessage = mock(EnqueueableMessage.class);
+ when(enqueueableMessage.isPersistent()).thenReturn(true);
+ when(enqueueableMessage.getMessageNumber()).thenReturn(messageId);
+ when(enqueueableMessage.getStoredMessage()).thenReturn(message);
+ return enqueueableMessage;
+ }
+
private Record getTestRecord(long messageNumber)
{
UUID queueId1 = UUIDGenerator.generateRandomUUID();
@@ -121,77 +382,66 @@ public abstract class MessageStoreTestCase extends QpidTestCase
EnqueueableMessage message1 = mock(EnqueueableMessage.class);
when(message1.isPersistent()).thenReturn(true);
when(message1.getMessageNumber()).thenReturn(messageNumber);
- final StoredMessage storedMessage = mock(StoredMessage.class);
+ final StoredMessage<?> storedMessage = mock(StoredMessage.class);
when(storedMessage.getMessageNumber()).thenReturn(messageNumber);
when(message1.getStoredMessage()).thenReturn(storedMessage);
Record enqueueRecord = new TestRecord(queue1, message1);
return enqueueRecord;
}
- private static class TestRecord implements Record
+ private EnqueueableMessage createEnqueueableMessage(long messageId1)
{
- private TransactionLogResource _queue;
- private EnqueueableMessage _message;
+ final StoredMessage<TestMessageMetaData> message1 = _store.addMessage(new TestMessageMetaData(messageId1, 0));
+ StoreFuture flushFuture = message1.flushToStore();
+ flushFuture.waitForCompletion();
+ EnqueueableMessage enqueueableMessage1 = createMockEnqueueableMessage(messageId1, message1);
+ return enqueueableMessage1;
+ }
- public TestRecord(TransactionLogResource queue, EnqueueableMessage message)
+ private class MessageMetaDataMatcher extends ArgumentMatcher<StoredMessage<?>>
+ {
+ private long _messageNumber;
+
+ public MessageMetaDataMatcher(long messageNumber)
{
super();
- _queue = queue;
- _message = message;
+ _messageNumber = messageNumber;
}
- @Override
- public TransactionLogResource getResource()
+ public boolean matches(Object obj)
{
- return _queue;
+ return obj instanceof StoredMessage && ((StoredMessage<?>)obj).getMessageNumber() == _messageNumber;
}
+ }
- @Override
- public EnqueueableMessage getMessage()
- {
- return _message;
- }
+ private class QueueFilteringMessageInstanceHandler implements MessageInstanceHandler
+ {
+ private final UUID _queueId;
+ private final Set<Long> _enqueuedIds = new HashSet<Long>();
- @Override
- public int hashCode()
+ public QueueFilteringMessageInstanceHandler(UUID queueId)
{
- final int prime = 31;
- int result = 1;
- result = prime * result + ((_message == null) ? 0 : new Long(_message.getMessageNumber()).hashCode());
- result = prime * result + ((_queue == null) ? 0 : _queue.getId().hashCode());
- return result;
+ _queueId = queueId;
}
@Override
- public boolean equals(Object obj)
+ public boolean handle(UUID queueId, long messageId)
{
- if (this == obj)
- {
- return true;
- }
- if (obj == null)
- {
- return false;
- }
- if (!(obj instanceof Record))
+ if (queueId.equals(_queueId))
{
- return false;
+ if (_enqueuedIds.contains(messageId))
+ {
+ fail("Queue with id " + _queueId + " contains duplicate message ids");
+ }
+ _enqueuedIds.add(messageId);
}
- Record other = (Record) obj;
- if (_message == null && other.getMessage() != null)
- {
- return false;
- }
- if (_queue == null && other.getResource() != null)
- {
- return false;
- }
- if (_message.getMessageNumber() != other.getMessage().getMessageNumber())
- {
- return false;
- }
- return _queue.getId().equals(other.getResource().getId());
+ return true;
}
+ public Set<Long> getEnqueuedIds()
+ {
+ return _enqueuedIds;
+ }
}
+
}