summaryrefslogtreecommitdiff
path: root/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java')
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java136
1 files changed, 125 insertions, 11 deletions
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
index 7ebfd54df6..45f7a2a39e 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
@@ -23,50 +23,52 @@ package org.apache.qpid.server.store;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.times;
-import org.apache.qpid.server.model.VirtualHost;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.qpid.server.message.EnqueueableMessage;
+import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler;
+import org.apache.qpid.server.store.Transaction.Record;
import org.apache.qpid.test.utils.QpidTestCase;
public abstract class MessageStoreTestCase extends QpidTestCase
{
- private ConfigurationRecoveryHandler _recoveryHandler;
private MessageStoreRecoveryHandler _messageStoreRecoveryHandler;
private StoredMessageRecoveryHandler _storedMessageRecoveryHandler;
private TransactionLogRecoveryHandler _logRecoveryHandler;
private TransactionLogRecoveryHandler.QueueEntryRecoveryHandler _queueEntryRecoveryHandler;
private TransactionLogRecoveryHandler.DtxRecordRecoveryHandler _dtxRecordRecoveryHandler;
- private VirtualHost _virtualHost;
private MessageStore _store;
+ private Map<String, Object> _storeSettings;
public void setUp() throws Exception
{
super.setUp();
- _recoveryHandler = mock(ConfigurationRecoveryHandler.class);
_storedMessageRecoveryHandler = mock(StoredMessageRecoveryHandler.class);
_logRecoveryHandler = mock(TransactionLogRecoveryHandler.class);
_messageStoreRecoveryHandler = mock(MessageStoreRecoveryHandler.class);
_queueEntryRecoveryHandler = mock(TransactionLogRecoveryHandler.QueueEntryRecoveryHandler.class);
_dtxRecordRecoveryHandler = mock(TransactionLogRecoveryHandler.DtxRecordRecoveryHandler.class);
- _virtualHost = mock(VirtualHost.class);
-
when(_messageStoreRecoveryHandler.begin()).thenReturn(_storedMessageRecoveryHandler);
when(_logRecoveryHandler.begin(any(MessageStore.class))).thenReturn(_queueEntryRecoveryHandler);
when(_queueEntryRecoveryHandler.completeQueueEntryRecovery()).thenReturn(_dtxRecordRecoveryHandler);
- setUpStoreConfiguration(_virtualHost);
- when(_virtualHost.getName()).thenReturn(getTestName());
+ _storeSettings = getStoreSettings();
_store = createMessageStore();
- ((DurableConfigurationStore)_store).configureConfigStore(_virtualHost, _recoveryHandler);
- _store.configureMessageStore(_virtualHost, _messageStoreRecoveryHandler, _logRecoveryHandler);
+ _store.openMessageStore("test", _storeSettings);
+ _store.recoverMessageStore(_messageStoreRecoveryHandler, _logRecoveryHandler);
}
- protected abstract void setUpStoreConfiguration(VirtualHost virtualHost) throws Exception;
+ protected abstract Map<String, Object> getStoreSettings() throws Exception;
protected abstract MessageStore createMessageStore();
@@ -75,4 +77,116 @@ public abstract class MessageStoreTestCase extends QpidTestCase
return _store;
}
+ public void testRecordXid() throws Exception
+ {
+ Record enqueueRecord = getTestRecord(1);
+ Record dequeueRecord = getTestRecord(2);
+ Record[] enqueues = { enqueueRecord };
+ Record[] dequeues = { dequeueRecord };
+ byte[] globalId = new byte[] { 1 };
+ byte[] branchId = new byte[] { 2 };
+
+ Transaction transaction = _store.newTransaction();
+ transaction.recordXid(1l, globalId, branchId, enqueues, dequeues);
+ transaction.commitTran();
+ reopenStore();
+ verify(_dtxRecordRecoveryHandler).dtxRecord(1l, globalId, branchId, enqueues, dequeues);
+
+ transaction = _store.newTransaction();
+ transaction.removeXid(1l, globalId, branchId);
+ transaction.commitTran();
+
+ reopenStore();
+ verify(_dtxRecordRecoveryHandler, times(1)).dtxRecord(1l, globalId, branchId, enqueues, dequeues);
+ }
+
+ private void reopenStore() throws Exception
+ {
+ _store.closeMessageStore();
+
+ _store = createMessageStore();
+ _store.openMessageStore("test", _storeSettings);
+ _store.recoverMessageStore(_messageStoreRecoveryHandler, _logRecoveryHandler);
+ }
+ private Record getTestRecord(long messageNumber)
+ {
+ UUID queueId1 = UUIDGenerator.generateRandomUUID();
+ TransactionLogResource queue1 = mock(TransactionLogResource.class);
+ when(queue1.getId()).thenReturn(queueId1);
+ EnqueueableMessage message1 = mock(EnqueueableMessage.class);
+ when(message1.isPersistent()).thenReturn(true);
+ when(message1.getMessageNumber()).thenReturn(messageNumber);
+ final StoredMessage storedMessage = mock(StoredMessage.class);
+ when(storedMessage.getMessageNumber()).thenReturn(messageNumber);
+ when(message1.getStoredMessage()).thenReturn(storedMessage);
+ Record enqueueRecord = new TestRecord(queue1, message1);
+ return enqueueRecord;
+ }
+
+ private static class TestRecord implements Record
+ {
+ private TransactionLogResource _queue;
+ private EnqueueableMessage _message;
+
+ public TestRecord(TransactionLogResource queue, EnqueueableMessage message)
+ {
+ super();
+ _queue = queue;
+ _message = message;
+ }
+
+ @Override
+ public TransactionLogResource getResource()
+ {
+ return _queue;
+ }
+
+ @Override
+ public EnqueueableMessage getMessage()
+ {
+ return _message;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((_message == null) ? 0 : new Long(_message.getMessageNumber()).hashCode());
+ result = prime * result + ((_queue == null) ? 0 : _queue.getId().hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (this == obj)
+ {
+ return true;
+ }
+ if (obj == null)
+ {
+ return false;
+ }
+ if (!(obj instanceof Record))
+ {
+ return false;
+ }
+ Record other = (Record) obj;
+ if (_message == null && other.getMessage() != null)
+ {
+ return false;
+ }
+ if (_queue == null && other.getResource() != null)
+ {
+ return false;
+ }
+ if (_message.getMessageNumber() != other.getMessage().getMessageNumber())
+ {
+ return false;
+ }
+ return _queue.getId().equals(other.getResource().getId());
+ }
+
+ }
}