diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2015-04-03 22:21:05 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2015-04-03 22:21:05 +0000 |
commit | 8bcfb7bb278644a547bddf4719265d806ea69d72 (patch) | |
tree | 6aa69e2381cb2c7d71cbe59cbc8ed8005667937b | |
parent | 47f4f5148f7a6f4fa3c214cc2efd4e4a3f44641c (diff) | |
download | qpid-python-8bcfb7bb278644a547bddf4719265d806ea69d72.tar.gz |
QPID-6476 : [Java Broker] Refactor MessageStore to allow more efficient implementations
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1671184 13f79535-47bb-0310-9956-ffa450edef68
99 files changed, 2292 insertions, 1218 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java index c668cc8595..b030b6c091 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java @@ -26,6 +26,7 @@ import java.lang.ref.SoftReference; import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Random; import java.util.UUID; @@ -48,6 +49,8 @@ import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.store.Event; import org.apache.qpid.server.store.EventListener; import org.apache.qpid.server.store.EventManager; +import org.apache.qpid.server.store.MessageEnqueueRecord; +import org.apache.qpid.server.store.MessageHandle; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.store.StoreException; @@ -114,7 +117,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore } @Override - public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData) + public <T extends StorableMessageMetaData> MessageHandle<T> addMessage(T metaData) { long newMessageId = getNextMessageId(); @@ -163,154 +166,9 @@ public abstract class AbstractBDBMessageStore implements MessageStore } @Override - public void visitMessages(final MessageHandler handler) throws StoreException + public MessageStoreReader newMessageStoreReader() { - checkMessageStoreOpen(); - visitMessagesInternal(handler, getEnvironmentFacade()); - } - - @Override - public StoredMessage<?> getMessage(final long messageId) - { - checkMessageStoreOpen(); - return getMessageInternal(messageId, getEnvironmentFacade()); - } - - @Override - public void visitMessageInstances(final TransactionLogResource queue, final MessageInstanceHandler handler) throws StoreException - { - checkMessageStoreOpen(); - - Cursor cursor = null; - List<QueueEntryKey> entries = new ArrayList<QueueEntryKey>(); - try - { - cursor = getDeliveryDb().openCursor(null, null); - DatabaseEntry key = new DatabaseEntry(); - DatabaseEntry value = new DatabaseEntry(); - - QueueEntryBinding keyBinding = QueueEntryBinding.getInstance(); - keyBinding.objectToEntry(new QueueEntryKey(queue.getId(),0l), key); - - if(cursor.getSearchKeyRange(key,value,LockMode.DEFAULT) == OperationStatus.SUCCESS) - { - QueueEntryKey entry = keyBinding.entryToObject(key); - if(entry.getQueueId().equals(queue.getId())) - { - entries.add(entry); - } - while (cursor.getNext(key, value, LockMode.DEFAULT) == OperationStatus.SUCCESS) - { - entry = keyBinding.entryToObject(key); - if(entry.getQueueId().equals(queue.getId())) - { - entries.add(entry); - } - else - { - break; - } - } - } - } - catch (RuntimeException e) - { - throw getEnvironmentFacade().handleDatabaseException("Cannot visit message instances", e); - } - finally - { - closeCursorSafely(cursor, getEnvironmentFacade()); - } - - for(QueueEntryKey entry : entries) - { - UUID queueId = entry.getQueueId(); - long messageId = entry.getMessageId(); - if (!handler.handle(queueId, messageId)) - { - break; - } - } - - } - - - - @Override - public void visitMessageInstances(final MessageInstanceHandler handler) throws StoreException - { - checkMessageStoreOpen(); - - Cursor cursor = null; - List<QueueEntryKey> entries = new ArrayList<QueueEntryKey>(); - try - { - cursor = getDeliveryDb().openCursor(null, null); - DatabaseEntry key = new DatabaseEntry(); - QueueEntryBinding keyBinding = QueueEntryBinding.getInstance(); - - DatabaseEntry value = new DatabaseEntry(); - while (cursor.getNext(key, value, LockMode.DEFAULT) == OperationStatus.SUCCESS) - { - QueueEntryKey entry = keyBinding.entryToObject(key); - entries.add(entry); - } - } - catch (RuntimeException e) - { - throw getEnvironmentFacade().handleDatabaseException("Cannot visit message instances", e); - } - finally - { - closeCursorSafely(cursor, getEnvironmentFacade()); - } - - for(QueueEntryKey entry : entries) - { - UUID queueId = entry.getQueueId(); - long messageId = entry.getMessageId(); - if (!handler.handle(queueId, messageId)) - { - break; - } - } - - } - - @Override - public void visitDistributedTransactions(final DistributedTransactionHandler handler) throws StoreException - { - checkMessageStoreOpen(); - - Cursor cursor = null; - try - { - cursor = getXidDb().openCursor(null, null); - DatabaseEntry key = new DatabaseEntry(); - XidBinding keyBinding = XidBinding.getInstance(); - PreparedTransactionBinding valueBinding = new PreparedTransactionBinding(); - DatabaseEntry value = new DatabaseEntry(); - - while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) - { - Xid xid = keyBinding.entryToObject(key); - PreparedTransaction preparedTransaction = valueBinding.entryToObject(value); - if (!handler.handle(xid.getFormat(), xid.getGlobalId(), xid.getBranchId(), - preparedTransaction.getEnqueues(), preparedTransaction.getDequeues())) - { - break; - } - } - - } - catch (RuntimeException e) - { - throw getEnvironmentFacade().handleDatabaseException("Cannot recover distributed transactions", e); - } - finally - { - closeCursorSafely(cursor, getEnvironmentFacade()); - } + return new BDBMessageStoreReader(); } /** @@ -793,24 +651,23 @@ public abstract class AbstractBDBMessageStore implements MessageStore * Extracts a message from a specified queue, in a given transaction. * * @param tx The transaction for the operation. - * @param queue The queue to take the message from. + * @param queueId The id of the queue to take the message from. * @param messageId The message to dequeue. * * @throws org.apache.qpid.server.store.StoreException If the operation fails for any reason, or if the specified message does not exist. */ - private void dequeueMessage(final Transaction tx, final TransactionLogResource queue, + private void dequeueMessage(final Transaction tx, final UUID queueId, long messageId) throws StoreException { DatabaseEntry key = new DatabaseEntry(); QueueEntryBinding keyBinding = QueueEntryBinding.getInstance(); - QueueEntryKey queueEntryKey = new QueueEntryKey(queue.getId(), messageId); - UUID id = queue.getId(); + QueueEntryKey queueEntryKey = new QueueEntryKey(queueId, messageId); + UUID id = queueId; keyBinding.objectToEntry(queueEntryKey, key); if (getLogger().isDebugEnabled()) { - getLogger().debug("Dequeue message id " + messageId + " from queue " - + queue.getName() + " with id " + id); + getLogger().debug("Dequeue message id " + messageId + " from queue with id " + id); } try @@ -819,19 +676,16 @@ public abstract class AbstractBDBMessageStore implements MessageStore OperationStatus status = getDeliveryDb().delete(tx, key); if (status == OperationStatus.NOTFOUND) { - throw new StoreException("Unable to find message with id " + messageId + " on queue " - + queue.getName() + " with id " + id); + throw new StoreException("Unable to find message with id " + messageId + " on queue with id " + id); } else if (status != OperationStatus.SUCCESS) { - throw new StoreException("Unable to remove message with id " + messageId + " on queue" - + queue.getName() + " with id " + id); + throw new StoreException("Unable to remove message with id " + messageId + " on queue with id " + id); } if (getLogger().isDebugEnabled()) { - getLogger().debug("Removed message " + messageId + " on queue " - + queue.getName() + " with id " + id); + getLogger().debug("Removed message " + messageId + " on queue with id " + id); } } @@ -849,8 +703,8 @@ public abstract class AbstractBDBMessageStore implements MessageStore long format, byte[] globalId, byte[] branchId, - org.apache.qpid.server.store.Transaction.Record[] enqueues, - org.apache.qpid.server.store.Transaction.Record[] dequeues) throws StoreException + org.apache.qpid.server.store.Transaction.EnqueueRecord[] enqueues, + org.apache.qpid.server.store.Transaction.DequeueRecord[] dequeues) throws StoreException { DatabaseEntry key = new DatabaseEntry(); Xid xid = new Xid(format, globalId, branchId); @@ -862,7 +716,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore PreparedTransactionBinding valueBinding = new PreparedTransactionBinding(); valueBinding.objectToEntry(preparedTransaction, value); List<Runnable> postActions = new ArrayList<>(); - for(org.apache.qpid.server.store.Transaction.Record enqueue : enqueues) + for(org.apache.qpid.server.store.Transaction.EnqueueRecord enqueue : enqueues) { StoredMessage storedMessage = enqueue.getMessage().getStoredMessage(); if(storedMessage instanceof StoredBDBMessage) @@ -1187,7 +1041,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore } } - final class StoredBDBMessage<T extends StorableMessageMetaData> implements StoredMessage<T> + final class StoredBDBMessage<T extends StorableMessageMetaData> implements StoredMessage<T>, MessageHandle<T> { private final long _messageId; @@ -1234,7 +1088,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore } @Override - public void addContent(int offsetInMessage, ByteBuffer src) + public void addContent(ByteBuffer src) { src = src.slice(); byte[] data = _messageDataRef.getData(); @@ -1259,6 +1113,12 @@ public abstract class AbstractBDBMessageStore implements MessageStore } @Override + public StoredMessage<T> allContentAdded() + { + return this; + } + + @Override public int getContent(int offsetInMessage, ByteBuffer dst) { byte[] data = _messageDataRef.getData(); @@ -1466,7 +1326,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore } @Override - public void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message) throws StoreException + public MessageEnqueueRecord enqueueMessage(TransactionLogResource queue, EnqueueableMessage message) throws StoreException { checkMessageStoreOpen(); @@ -1487,14 +1347,16 @@ public abstract class AbstractBDBMessageStore implements MessageStore } AbstractBDBMessageStore.this.enqueueMessage(_txn, queue, message.getMessageNumber()); + return new BDBEnqueueRecord(queue.getId(), message.getMessageNumber()); } @Override - public void dequeueMessage(TransactionLogResource queue, EnqueueableMessage message) throws StoreException + public void dequeueMessage(final MessageEnqueueRecord enqueueRecord) { checkMessageStoreOpen(); - AbstractBDBMessageStore.this.dequeueMessage(_txn, queue, message.getMessageNumber()); + AbstractBDBMessageStore.this.dequeueMessage(_txn, enqueueRecord.getQueueId(), + enqueueRecord.getMessageNumber()); } @Override @@ -1546,21 +1408,268 @@ public abstract class AbstractBDBMessageStore implements MessageStore } @Override - public void removeXid(long format, byte[] globalId, byte[] branchId) throws StoreException + public void removeXid(final StoredXidRecord record) { checkMessageStoreOpen(); - AbstractBDBMessageStore.this.removeXid(_txn, format, globalId, branchId); + AbstractBDBMessageStore.this.removeXid(_txn, record.getFormat(), record.getGlobalId(), record.getBranchId()); } @Override - public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, - Record[] dequeues) throws StoreException + public StoredXidRecord recordXid(final long format, final byte[] globalId, final byte[] branchId, final EnqueueRecord[] enqueues, + final DequeueRecord[] dequeues) throws StoreException { checkMessageStoreOpen(); _postCommitActions.addAll(AbstractBDBMessageStore.this.recordXid(_txn, format, globalId, branchId, enqueues, dequeues)); + return new BDBStoredXidRecord(format, globalId, branchId); } + } + private static class BDBStoredXidRecord implements org.apache.qpid.server.store.Transaction.StoredXidRecord + { + private final long _format; + private final byte[] _globalId; + private final byte[] _branchId; + + public BDBStoredXidRecord(final long format, final byte[] globalId, final byte[] branchId) + { + _format = format; + _globalId = globalId; + _branchId = branchId; + } + + @Override + public long getFormat() + { + return _format; + } + + @Override + public byte[] getGlobalId() + { + return _globalId; + } + + @Override + public byte[] getBranchId() + { + return _branchId; + } + + @Override + public boolean equals(final Object o) + { + if (this == o) + { + return true; + } + if (o == null || getClass() != o.getClass()) + { + return false; + } + + final BDBStoredXidRecord that = (BDBStoredXidRecord) o; + + return _format == that._format + && Arrays.equals(_globalId, that._globalId) + && Arrays.equals(_branchId, that._branchId); + + } + + @Override + public int hashCode() + { + int result = (int) (_format ^ (_format >>> 32)); + result = 31 * result + Arrays.hashCode(_globalId); + result = 31 * result + Arrays.hashCode(_branchId); + return result; + } + } + public static class BDBEnqueueRecord implements MessageEnqueueRecord + { + private final UUID _queueId; + + private final long _messageNumber; + + public BDBEnqueueRecord(final UUID queueid, final long messageNumber) + { + _queueId = queueid; + _messageNumber = messageNumber; + } + + public long getMessageNumber() + { + return _messageNumber; + } + + public UUID getQueueId() + { + return _queueId; + } + + } + + private class BDBMessageStoreReader implements MessageStoreReader + { + @Override + public void visitMessages(final MessageHandler handler) throws StoreException + { + checkMessageStoreOpen(); + visitMessagesInternal(handler, getEnvironmentFacade()); + } + + @Override + public StoredMessage<?> getMessage(final long messageId) + { + checkMessageStoreOpen(); + return getMessageInternal(messageId, getEnvironmentFacade()); + } + + @Override + public void close() + { + + } + + @Override + public void visitMessageInstances(final TransactionLogResource queue, final MessageInstanceHandler handler) throws StoreException + { + checkMessageStoreOpen(); + + Cursor cursor = null; + List<QueueEntryKey> entries = new ArrayList<QueueEntryKey>(); + try + { + cursor = getDeliveryDb().openCursor(null, null); + DatabaseEntry key = new DatabaseEntry(); + DatabaseEntry value = new DatabaseEntry(); + + QueueEntryBinding keyBinding = QueueEntryBinding.getInstance(); + keyBinding.objectToEntry(new QueueEntryKey(queue.getId(),0l), key); + + if(cursor.getSearchKeyRange(key,value,LockMode.DEFAULT) == OperationStatus.SUCCESS) + { + QueueEntryKey entry = keyBinding.entryToObject(key); + if(entry.getQueueId().equals(queue.getId())) + { + entries.add(entry); + } + while (cursor.getNext(key, value, LockMode.DEFAULT) == OperationStatus.SUCCESS) + { + entry = keyBinding.entryToObject(key); + if(entry.getQueueId().equals(queue.getId())) + { + entries.add(entry); + } + else + { + break; + } + } + } + } + catch (RuntimeException e) + { + throw getEnvironmentFacade().handleDatabaseException("Cannot visit message instances", e); + } + finally + { + closeCursorSafely(cursor, getEnvironmentFacade()); + } + + for(QueueEntryKey entry : entries) + { + UUID queueId = entry.getQueueId(); + long messageId = entry.getMessageId(); + if (!handler.handle(new BDBEnqueueRecord(queueId, messageId))) + { + break; + } + } + + } + + + + @Override + public void visitMessageInstances(final MessageInstanceHandler handler) throws StoreException + { + checkMessageStoreOpen(); + + Cursor cursor = null; + List<QueueEntryKey> entries = new ArrayList<QueueEntryKey>(); + try + { + cursor = getDeliveryDb().openCursor(null, null); + DatabaseEntry key = new DatabaseEntry(); + QueueEntryBinding keyBinding = QueueEntryBinding.getInstance(); + + DatabaseEntry value = new DatabaseEntry(); + while (cursor.getNext(key, value, LockMode.DEFAULT) == OperationStatus.SUCCESS) + { + QueueEntryKey entry = keyBinding.entryToObject(key); + entries.add(entry); + } + } + catch (RuntimeException e) + { + throw getEnvironmentFacade().handleDatabaseException("Cannot visit message instances", e); + } + finally + { + closeCursorSafely(cursor, getEnvironmentFacade()); + } + + for(QueueEntryKey entry : entries) + { + UUID queueId = entry.getQueueId(); + long messageId = entry.getMessageId(); + if (!handler.handle(new BDBEnqueueRecord(queueId, messageId))) + { + break; + } + } + + } + + @Override + public void visitDistributedTransactions(final DistributedTransactionHandler handler) throws StoreException + { + checkMessageStoreOpen(); + + Cursor cursor = null; + try + { + cursor = getXidDb().openCursor(null, null); + DatabaseEntry key = new DatabaseEntry(); + XidBinding keyBinding = XidBinding.getInstance(); + PreparedTransactionBinding valueBinding = new PreparedTransactionBinding(); + DatabaseEntry value = new DatabaseEntry(); + + while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) + { + Xid xid = keyBinding.entryToObject(key); + PreparedTransaction preparedTransaction = valueBinding.entryToObject(value); + if (!handler.handle(new BDBStoredXidRecord(xid.getFormat(), xid.getGlobalId(), xid.getBranchId()), + preparedTransaction.getEnqueues(), preparedTransaction.getDequeues())) + { + break; + } + } + + } + catch (RuntimeException e) + { + throw getEnvironmentFacade().handleDatabaseException("Cannot recover distributed transactions", e); + } + finally + { + closeCursorSafely(cursor, getEnvironmentFacade()); + } + } + + + } } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/PreparedTransaction.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/PreparedTransaction.java index eb5c4677ff..9ecc5b3283 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/PreparedTransaction.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/PreparedTransaction.java @@ -25,21 +25,21 @@ import org.apache.qpid.server.store.Transaction; public class PreparedTransaction { - private final Transaction.Record[] _enqueues; - private final Transaction.Record[] _dequeues; + private final Transaction.EnqueueRecord[] _enqueues; + private final Transaction.DequeueRecord[] _dequeues; - public PreparedTransaction(Transaction.Record[] enqueues, Transaction.Record[] dequeues) + public PreparedTransaction(Transaction.EnqueueRecord[] enqueues, Transaction.DequeueRecord[] dequeues) { _enqueues = enqueues; _dequeues = dequeues; } - public Transaction.Record[] getEnqueues() + public Transaction.EnqueueRecord[] getEnqueues() { return _enqueues; } - public Transaction.Record[] getDequeues() + public Transaction.DequeueRecord[] getDequeues() { return _dequeues; } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java index 1f4cf45ce1..4d111e5c4f 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java @@ -28,9 +28,11 @@ import com.sleepycat.bind.tuple.TupleInput; import com.sleepycat.bind.tuple.TupleOutput; import org.apache.qpid.server.message.EnqueueableMessage; import org.apache.qpid.server.store.MessageDurability; +import org.apache.qpid.server.store.MessageEnqueueRecord; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.Transaction; import org.apache.qpid.server.store.TransactionLogResource; +import org.apache.qpid.server.store.berkeleydb.AbstractBDBMessageStore; import org.apache.qpid.server.store.berkeleydb.entry.PreparedTransaction; public class PreparedTransactionBinding extends TupleBinding<PreparedTransaction> @@ -38,23 +40,34 @@ public class PreparedTransactionBinding extends TupleBinding<PreparedTransaction @Override public PreparedTransaction entryToObject(TupleInput input) { - Transaction.Record[] enqueues = readRecords(input); + Transaction.EnqueueRecord[] enqueues = readEnqueueRecords(input); - Transaction.Record[] dequeues = readRecords(input); + Transaction.DequeueRecord[] dequeues = readDequeueRecords(input); return new PreparedTransaction(enqueues, dequeues); } - private Transaction.Record[] readRecords(TupleInput input) + private Transaction.EnqueueRecord[] readEnqueueRecords(TupleInput input) { - Transaction.Record[] records = new Transaction.Record[input.readInt()]; + Transaction.EnqueueRecord[] records = new Transaction.EnqueueRecord[input.readInt()]; for(int i = 0; i < records.length; i++) { - records[i] = new RecordImpl(new UUID(input.readLong(), input.readLong()), input.readLong()); + records[i] = new EnqueueRecordImpl(new UUID(input.readLong(), input.readLong()), input.readLong()); } return records; } + private Transaction.DequeueRecord[] readDequeueRecords(TupleInput input) + { + Transaction.DequeueRecord[] records = new Transaction.DequeueRecord[input.readInt()]; + for(int i = 0; i < records.length; i++) + { + records[i] = new DequeueRecordImpl(new UUID(input.readLong(), input.readLong()), input.readLong()); + } + return records; + } + + @Override public void objectToEntry(PreparedTransaction preparedTransaction, TupleOutput output) { @@ -63,7 +76,7 @@ public class PreparedTransactionBinding extends TupleBinding<PreparedTransaction } - private void writeRecords(Transaction.Record[] records, TupleOutput output) + private void writeRecords(Transaction.EnqueueRecord[] records, TupleOutput output) { if(records == null) { @@ -72,7 +85,7 @@ public class PreparedTransactionBinding extends TupleBinding<PreparedTransaction else { output.writeInt(records.length); - for(Transaction.Record record : records) + for(Transaction.EnqueueRecord record : records) { UUID id = record.getResource().getId(); output.writeLong(id.getMostSignificantBits()); @@ -82,13 +95,32 @@ public class PreparedTransactionBinding extends TupleBinding<PreparedTransaction } } - private static class RecordImpl implements Transaction.Record, TransactionLogResource, EnqueueableMessage + private void writeRecords(Transaction.DequeueRecord[] records, TupleOutput output) + { + if(records == null) + { + output.writeInt(0); + } + else + { + output.writeInt(records.length); + for(Transaction.DequeueRecord record : records) + { + UUID id = record.getEnqueueRecord().getQueueId(); + output.writeLong(id.getMostSignificantBits()); + output.writeLong(id.getLeastSignificantBits()); + output.writeLong(record.getEnqueueRecord().getMessageNumber()); + } + } + } + + private static class EnqueueRecordImpl implements Transaction.EnqueueRecord, TransactionLogResource, EnqueueableMessage { private long _messageNumber; private UUID _queueId; - public RecordImpl(UUID queueId, long messageNumber) + public EnqueueRecordImpl(UUID queueId, long messageNumber) { _messageNumber = messageNumber; _queueId = queueId; @@ -137,4 +169,21 @@ public class PreparedTransactionBinding extends TupleBinding<PreparedTransaction return MessageDurability.DEFAULT; } } + + private static class DequeueRecordImpl implements Transaction.DequeueRecord + { + + private final AbstractBDBMessageStore.BDBEnqueueRecord _record; + + public DequeueRecordImpl(final UUID queueId, final long messageNumber) + { + _record = new AbstractBDBMessageStore.BDBEnqueueRecord(queueId, messageNumber); + } + + @Override + public MessageEnqueueRecord getEnqueueRecord() + { + return _record; + } + } } diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java index fb0c11f6e5..3f8c1a7a99 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java @@ -36,6 +36,7 @@ import org.apache.qpid.server.protocol.v0_10.MessageMetaDataType_0_10; import org.apache.qpid.server.protocol.v0_10.MessageMetaData_0_10; import org.apache.qpid.server.protocol.v0_8.MessageMetaData; import org.apache.qpid.server.protocol.v0_8.MessageMetaDataType_0_8; +import org.apache.qpid.server.store.MessageHandle; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.MessageStoreTestCase; import org.apache.qpid.server.store.StorableMessageMetaData; @@ -104,14 +105,15 @@ public class BDBMessageStoreTest extends MessageStoreTestCase ContentHeaderBody chb_0_8 = createContentHeaderBody_0_8(props_0_8, bodySize); MessageMetaData messageMetaData_0_8 = new MessageMetaData(pubInfoBody_0_8, chb_0_8); - StoredMessage<MessageMetaData> storedMessage_0_8 = bdbStore.addMessage(messageMetaData_0_8); + MessageHandle<MessageMetaData> messageHandle_0_8 = bdbStore.addMessage(messageMetaData_0_8); long origArrivalTime_0_8 = messageMetaData_0_8.getArrivalTime(); - long messageid_0_8 = storedMessage_0_8.getMessageNumber(); - storedMessage_0_8.addContent(0, firstContentBytes_0_8); - storedMessage_0_8.addContent(firstContentBytes_0_8.limit(), secondContentBytes_0_8); - ((AbstractBDBMessageStore.StoredBDBMessage)storedMessage_0_8).flushToStore(); + messageHandle_0_8.addContent(firstContentBytes_0_8); + messageHandle_0_8.addContent(secondContentBytes_0_8); + final StoredMessage<MessageMetaData> storedMessage_0_8 = messageHandle_0_8.allContentAdded(); + long messageid_0_8 = storedMessage_0_8.getMessageNumber(); + ((AbstractBDBMessageStore.StoredBDBMessage)messageHandle_0_8).flushToStore(); /* * Create and insert a 0-10 message (metadata and content) @@ -124,13 +126,14 @@ public class BDBMessageStoreTest extends MessageStoreTestCase MessageAcquireMode.PRE_ACQUIRED, header_0_10, completeContentBody_0_10); MessageMetaData_0_10 messageMetaData_0_10 = new MessageMetaData_0_10(xfr_0_10); - StoredMessage<MessageMetaData_0_10> storedMessage_0_10 = bdbStore.addMessage(messageMetaData_0_10); + MessageHandle<MessageMetaData_0_10> messageHandle_0_10 = bdbStore.addMessage(messageMetaData_0_10); long origArrivalTime_0_10 = messageMetaData_0_10.getArrivalTime(); - long messageid_0_10 = storedMessage_0_10.getMessageNumber(); - storedMessage_0_10.addContent(0, completeContentBody_0_10); - ((AbstractBDBMessageStore.StoredBDBMessage)storedMessage_0_10).flushToStore(); + messageHandle_0_10.addContent(completeContentBody_0_10); + final StoredMessage<MessageMetaData_0_10> storedMessage_0_10 = messageHandle_0_10.allContentAdded(); + long messageid_0_10 = storedMessage_0_10.getMessageNumber(); + ((AbstractBDBMessageStore.StoredBDBMessage)messageHandle_0_10).flushToStore(); /* * reload the store only (read-only) @@ -352,12 +355,12 @@ public class BDBMessageStoreTest extends MessageStoreTestCase ContentHeaderBody chb_0_8 = createContentHeaderBody_0_8(props_0_8, bodySize); MessageMetaData messageMetaData_0_8 = new MessageMetaData(pubInfoBody_0_8, chb_0_8); - StoredMessage<MessageMetaData> storedMessage_0_8 = store.addMessage(messageMetaData_0_8); + MessageHandle<MessageMetaData> storedMessage_0_8 = store.addMessage(messageMetaData_0_8); - storedMessage_0_8.addContent(0, chunk1); + storedMessage_0_8.addContent(chunk1); ((AbstractBDBMessageStore.StoredBDBMessage)storedMessage_0_8).flushToStore(); - return storedMessage_0_8; + return storedMessage_0_8.allContentAdded(); } public void testOnDelete() throws Exception diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java index 321f8b28e6..ccae965edb 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java @@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.util.StateChangeListener; +import org.apache.qpid.thread.ThreadFactory; public abstract class AbstractConsumerTarget implements ConsumerTarget { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java index ac623bc337..6fc940d665 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java @@ -66,6 +66,7 @@ import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.StateTransition; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; +import org.apache.qpid.server.store.MessageEnqueueRecord; import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.Action; @@ -522,17 +523,17 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>> baseQueues = queues.toArray(new BaseQueue[queues.size()]); } - txn.enqueue(queues,message, new ServerTransaction.Action() + txn.enqueue(queues,message, new ServerTransaction.EnqueueAction() { MessageReference _reference = message.newReference(); - public void postCommit() + public void postCommit(MessageEnqueueRecord... records) { try { for(int i = 0; i < baseQueues.length; i++) { - baseQueues[i].enqueue(message, postEnqueueAction); + baseQueues[i].enqueue(message, postEnqueueAction, records[i]); } } finally diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java index 7cf1963331..e2dc1bf754 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java @@ -25,6 +25,7 @@ import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.store.MessageEnqueueRecord; import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.Action; @@ -82,15 +83,15 @@ public class DefaultDestination implements MessageDestination } else { - txn.enqueue(q,message, new ServerTransaction.Action() + txn.enqueue(q,message, new ServerTransaction.EnqueueAction() { MessageReference _reference = message.newReference(); - public void postCommit() + public void postCommit(MessageEnqueueRecord... records) { try { - q.enqueue(message, postEnqueueAction); + q.enqueue(message, postEnqueueAction, records[0]); } finally { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java index d3518f428b..1ba506fb7e 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.message; import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.filter.Filterable; +import org.apache.qpid.server.store.MessageEnqueueRecord; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.Action; @@ -81,6 +82,8 @@ public interface MessageInstance ConsumerImpl getAcquiringConsumer(); + MessageEnqueueRecord getEnqueueRecord(); + public static enum State { AVAILABLE, diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java index 7801cf3d42..4d6216b9df 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java @@ -33,6 +33,7 @@ import java.util.Map; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.AbstractServerMessageImpl; +import org.apache.qpid.server.store.MessageHandle; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; @@ -134,11 +135,10 @@ public class InternalMessage extends AbstractServerMessageImpl<InternalMessage, final InternalMessageMetaData metaData = InternalMessageMetaData.create(persistent, internalHeader, bytes.length); - StoredMessage<InternalMessageMetaData> handle = - store.addMessage(metaData); - handle.addContent(0, ByteBuffer.wrap(bytes)); - - return new InternalMessage(handle, internalHeader, bodyObject); + MessageHandle<InternalMessageMetaData> handle = store.addMessage(metaData); + handle.addContent(ByteBuffer.wrap(bytes)); + StoredMessage<InternalMessageMetaData> storedMessage = handle.allContentAdded(); + return new InternalMessage(storedMessage, internalHeader, bodyObject); } catch (IOException e) { @@ -148,7 +148,13 @@ public class InternalMessage extends AbstractServerMessageImpl<InternalMessage, public static InternalMessage createStringMessage(MessageStore store, AMQMessageHeader header, String messageBody) { - return createMessage(store, header, messageBody, false); + return createStringMessage(store, header, messageBody, false); + } + + + public static InternalMessage createStringMessage(MessageStore store, AMQMessageHeader header, String messageBody, boolean persistent) + { + return createMessage(store, header, messageBody, persistent); } public static InternalMessage createMapMessage(MessageStore store, AMQMessageHeader header, Map<? extends Object,? extends Object> messageBody) @@ -163,7 +169,13 @@ public class InternalMessage extends AbstractServerMessageImpl<InternalMessage, public static InternalMessage createBytesMessage(MessageStore store, AMQMessageHeader header, byte[] messageBody) { - return createMessage(store, header, messageBody, false); + return createBytesMessage(store, header, messageBody, false); + } + + + public static InternalMessage createBytesMessage(MessageStore store, AMQMessageHeader header, byte[] messageBody, boolean persist) + { + return createMessage(store, header, messageBody, persist); } public static InternalMessage convert(long messageNumber, boolean persistent, AMQMessageHeader header, Object messageBody) @@ -207,12 +219,6 @@ public class InternalMessage extends AbstractServerMessageImpl<InternalMessage, } @Override - public void addContent(final int offsetInMessage, final ByteBuffer src) - { - throw new UnsupportedOperationException(); - } - - @Override public int getContent(final int offsetInMessage, final ByteBuffer dst) { ByteBuffer buffer = ByteBuffer.wrap(bytes); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageMetaData.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageMetaData.java index ed16335cf2..28fe23d11b 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageMetaData.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageMetaData.java @@ -24,6 +24,7 @@ import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; import java.io.IOException; import java.io.ObjectOutputStream; import java.nio.ByteBuffer; @@ -33,14 +34,27 @@ public class InternalMessageMetaData implements StorableMessageMetaData private boolean _isPersistent; - private byte[] _headerBytes; + private InternalMessageHeader _header; private int _contentSize; + private byte[] _headerBytes; - public InternalMessageMetaData(final boolean isPersistent, final byte[] headerBytes, final int contentSize) + public InternalMessageMetaData(final boolean isPersistent, final InternalMessageHeader header, final int contentSize) { _isPersistent = isPersistent; - _headerBytes = headerBytes; + _header = header; _contentSize = contentSize; + + try(ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); + ObjectOutputStream os = new ObjectOutputStream(bytesOut)) + { + os.writeInt(contentSize); + os.writeObject(header); + _headerBytes = bytesOut.toByteArray(); + } + catch (IOException e) + { + throw new ConnectionScopedRuntimeException("Unexpected IO Exception on in memory operation", e); + } } @Override @@ -76,20 +90,7 @@ public class InternalMessageMetaData implements StorableMessageMetaData static InternalMessageMetaData create(boolean persistent, final InternalMessageHeader header, int contentSize) { - ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); - try - { - ObjectOutputStream os = new ObjectOutputStream(bytesOut); - os.writeObject(header); - byte[] bytes = bytesOut.toByteArray(); - - return new InternalMessageMetaData(persistent, bytes, contentSize); - - } - catch (IOException e) - { - throw new ConnectionScopedRuntimeException("Unexpected IO Exception on in memory operation", e); - } + return new InternalMessageMetaData(persistent, header, contentSize); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageMetaDataType.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageMetaDataType.java index 7fc13fb40b..966e16b22c 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageMetaDataType.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageMetaDataType.java @@ -34,7 +34,7 @@ import org.apache.qpid.util.ByteBufferInputStream; @PluggableService public class InternalMessageMetaDataType implements MessageMetaDataType<InternalMessageMetaData> { - public static final int INTERNAL_ORDINAL = 999; + public static final int INTERNAL_ORDINAL = 255; public static final String TYPE = "INTERNAL"; @Override @@ -46,10 +46,13 @@ public class InternalMessageMetaDataType implements MessageMetaDataType<Internal @Override public InternalMessageMetaData createMetaData(final ByteBuffer buf) { + + try(ObjectInputStream is = new ObjectInputStream(new ByteBufferInputStream(buf))) { - InternalMessageMetaData metaData = (InternalMessageMetaData) is.readObject(); - return metaData; + int contentSize = is.readInt(); + InternalMessageHeader header = (InternalMessageHeader) is.readObject(); + return new InternalMessageMetaData(true, header, contentSize); } catch (IOException e) { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index aa64d6947d..aefb96cc9c 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -33,6 +33,7 @@ import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.QueueNotificationListener; import org.apache.qpid.server.protocol.CapacityChecker; +import org.apache.qpid.server.store.MessageEnqueueRecord; import org.apache.qpid.server.util.Deletable; import org.apache.qpid.server.virtualhost.VirtualHostImpl; @@ -112,7 +113,7 @@ public interface AMQQueue<X extends AMQQueue<X>> void completeRecovery(); - void recover(ServerMessage<?> message); + void recover(ServerMessage<?> message, final MessageEnqueueRecord enqueueRecord); void setTargetSize(long targetSize); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java index 3b42014136..568fb546b5 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java @@ -57,7 +57,6 @@ import org.apache.qpid.server.connection.SessionPrincipal; import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.consumer.ConsumerTarget; import org.apache.qpid.server.exchange.ExchangeImpl; -import org.apache.qpid.server.filter.ArrivalTimeFilter; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.MessageFilter; import org.apache.qpid.server.logging.EventLogger; @@ -90,6 +89,7 @@ import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; import org.apache.qpid.server.store.MessageDurability; +import org.apache.qpid.server.store.MessageEnqueueRecord; import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.txn.AutoCommitTransaction; @@ -1046,7 +1046,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> // ------ Enqueue / Dequeue - public final void enqueue(ServerMessage message, Action<? super MessageInstance> action) + public final void enqueue(ServerMessage message, Action<? super MessageInstance> action, MessageEnqueueRecord enqueueRecord) { incrementQueueCount(); incrementQueueSize(message); @@ -1060,30 +1060,30 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> if(_recovering.get()) { - EnqueueRequest request = new EnqueueRequest(message, action); + EnqueueRequest request = new EnqueueRequest(message, action, enqueueRecord); _postRecoveryQueue.add(request); // deal with the case the recovering status changed just as we added to the post recovery queue if(!_recovering.get() && _postRecoveryQueue.remove(request)) { - doEnqueue(message, action); + doEnqueue(message, action, enqueueRecord); } } else { - doEnqueue(message, action); + doEnqueue(message, action, enqueueRecord); } } - public final void recover(ServerMessage message) + public final void recover(ServerMessage message, final MessageEnqueueRecord enqueueRecord) { incrementQueueCount(); incrementQueueSize(message); _totalMessagesReceived.incrementAndGet(); - doEnqueue(message, null); + doEnqueue(message, null, enqueueRecord); } @@ -1107,15 +1107,15 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> { EnqueueRequest request = _postRecoveryQueue.poll(); MessageReference<?> messageReference = request.getMessage(); - doEnqueue(messageReference.getMessage(), request.getAction()); + doEnqueue(messageReference.getMessage(), request.getAction(), request.getEnqueueRecord()); messageReference.release(); } } - protected void doEnqueue(final ServerMessage message, final Action<? super MessageInstance> action) + protected void doEnqueue(final ServerMessage message, final Action<? super MessageInstance> action, MessageEnqueueRecord enqueueRecord) { final QueueConsumer<?> exclusiveSub = _exclusiveSubscriber; - final QueueEntry entry = getEntries().add(message); + final QueueEntry entry = getEntries().add(message, enqueueRecord); updateExpiration(entry); try @@ -1812,7 +1812,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> private void dequeueEntry(final QueueEntry node, ServerTransaction txn) { - txn.dequeue(this, node.getMessage(), + txn.dequeue(node.getEnqueueRecord(), new ServerTransaction.Action() { @@ -2660,15 +2660,15 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> if(!message.isReferenced(this)) { - txn.enqueue(this, message, new ServerTransaction.Action() + txn.enqueue(this, message, new ServerTransaction.EnqueueAction() { MessageReference _reference = message.newReference(); - public void postCommit() + public void postCommit(MessageEnqueueRecord... records) { try { - AbstractQueue.this.enqueue(message, postEnqueueAction); + AbstractQueue.this.enqueue(message, postEnqueueAction, records[0]); } finally { @@ -3140,10 +3140,13 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> { private final MessageReference<?> _message; private final Action<? super MessageInstance> _action; + private final MessageEnqueueRecord _enqueueRecord; public EnqueueRequest(final ServerMessage message, - final Action<? super MessageInstance> action) + final Action<? super MessageInstance> action, + final MessageEnqueueRecord enqueueRecord) { + _enqueueRecord = enqueueRecord; _message = message.newReference(); _action = action; } @@ -3157,5 +3160,10 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> { return _action; } + + public MessageEnqueueRecord getEnqueueRecord() + { + return _enqueueRecord; + } } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java index 01594e68a9..dc2f45a488 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java @@ -23,12 +23,13 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.store.MessageEnqueueRecord; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.util.Action; public interface BaseQueue extends TransactionLogResource { - void enqueue(ServerMessage message, Action<? super MessageInstance> action); + void enqueue(ServerMessage message, Action<? super MessageInstance> action, MessageEnqueueRecord record); boolean isDurable(); boolean isDeleted(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java index 1e250ac2c9..1e184da750 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java @@ -31,6 +31,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.store.MessageEnqueueRecord; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.ServerTransaction; @@ -67,9 +68,10 @@ public class LastValueQueueList extends OrderedQueueEntryList } @Override - protected ConflationQueueEntry createQueueEntry(ServerMessage message) + protected ConflationQueueEntry createQueueEntry(ServerMessage message, + final MessageEnqueueRecord enqueueRecord) { - return new ConflationQueueEntry(this, message); + return new ConflationQueueEntry(this, message, enqueueRecord); } @@ -78,9 +80,9 @@ public class LastValueQueueList extends OrderedQueueEntryList * Updates the list using super.add and also updates {@link #_latestValuesMap} and discards entries as necessary. */ @Override - public ConflationQueueEntry add(final ServerMessage message) + public ConflationQueueEntry add(final ServerMessage message, final MessageEnqueueRecord enqueueRecord) { - final ConflationQueueEntry addedEntry = (ConflationQueueEntry) super.add(message); + final ConflationQueueEntry addedEntry = (ConflationQueueEntry) super.add(message, enqueueRecord); final Object keyValue = message.getMessageHeader().getHeader(_conflationKey); if (keyValue != null) @@ -173,7 +175,7 @@ public class LastValueQueueList extends OrderedQueueEntryList if(entry.acquire()) { ServerTransaction txn = new AutoCommitTransaction(getQueue().getVirtualHost().getMessageStore()); - txn.dequeue(entry.getQueue(),entry.getMessage(), + txn.dequeue(entry.getEnqueueRecord(), new ServerTransaction.Action() { @Override @@ -201,9 +203,11 @@ public class LastValueQueueList extends OrderedQueueEntryList super(queueEntryList); } - public ConflationQueueEntry(LastValueQueueList queueEntryList, ServerMessage message) + public ConflationQueueEntry(LastValueQueueList queueEntryList, + ServerMessage message, + final MessageEnqueueRecord messageEnqueueRecord) { - super(queueEntryList, message); + super(queueEntryList, message, messageEnqueueRecord); } @Override diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntry.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntry.java index 6532b1224b..d45dceb5a8 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntry.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntry.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.store.MessageEnqueueRecord; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; @@ -38,14 +39,11 @@ public abstract class OrderedQueueEntry extends QueueEntryImpl super(queueEntryList); } - public OrderedQueueEntry(OrderedQueueEntryList queueEntryList, ServerMessage message, final long entryId) + public OrderedQueueEntry(OrderedQueueEntryList queueEntryList, + ServerMessage message, + final MessageEnqueueRecord messageEnqueueRecord) { - super(queueEntryList, message, entryId); - } - - public OrderedQueueEntry(OrderedQueueEntryList queueEntryList, ServerMessage message) - { - super(queueEntryList, message); + super(queueEntryList, message, messageEnqueueRecord); } @Override diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java index f3822bc9ca..3e1a535018 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java @@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.store.MessageEnqueueRecord; public abstract class OrderedQueueEntryList implements QueueEntryList { @@ -77,9 +78,9 @@ public abstract class OrderedQueueEntryList implements QueueEntryList } - public QueueEntry add(ServerMessage message) + public QueueEntry add(ServerMessage message, final MessageEnqueueRecord enqueueRecord) { - OrderedQueueEntry node = createQueueEntry(message); + OrderedQueueEntry node = createQueueEntry(message, enqueueRecord); for (;;) { OrderedQueueEntry tail = _tail; @@ -104,7 +105,8 @@ public abstract class OrderedQueueEntryList implements QueueEntryList } } - abstract protected OrderedQueueEntry createQueueEntry(ServerMessage<?> message); + abstract protected OrderedQueueEntry createQueueEntry(ServerMessage<?> message, + final MessageEnqueueRecord enqueueRecord); @Override public QueueEntry next(QueueEntry node) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java index d0b5578055..144a9c26e3 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.store.MessageEnqueueRecord; abstract public class PriorityQueueList extends OrderedQueueEntryList { @@ -78,7 +79,7 @@ abstract public class PriorityQueueList extends OrderedQueueEntryList } @Override - public PriorityQueueEntry add(ServerMessage message) + public PriorityQueueEntry add(ServerMessage message, final MessageEnqueueRecord enqueueRecord) { int index = message.getMessageHeader().getPriority() - _priorityOffset; if(index >= _priorities) @@ -89,12 +90,13 @@ abstract public class PriorityQueueList extends OrderedQueueEntryList { index = 0; } - return (PriorityQueueEntry) _priorityLists[index].add(message); + return (PriorityQueueEntry) _priorityLists[index].add(message, enqueueRecord); } @Override - protected PriorityQueueEntry createQueueEntry(final ServerMessage<?> message) + protected PriorityQueueEntry createQueueEntry(final ServerMessage<?> message, + final MessageEnqueueRecord enqueueRecord) { throw new UnsupportedOperationException(); } @@ -233,9 +235,10 @@ abstract public class PriorityQueueList extends OrderedQueueEntryList } @Override - protected PriorityQueueEntry createQueueEntry(ServerMessage<?> message) + protected PriorityQueueEntry createQueueEntry(ServerMessage<?> message, + final MessageEnqueueRecord enqueueRecord) { - return new PriorityQueueEntry(this, message); + return new PriorityQueueEntry(this, message, enqueueRecord); } public int getListPriority() @@ -251,9 +254,11 @@ abstract public class PriorityQueueList extends OrderedQueueEntryList super(queueEntryList); } - public PriorityQueueEntry(PriorityQueueEntrySubList queueEntryList, ServerMessage<?> message) + public PriorityQueueEntry(PriorityQueueEntrySubList queueEntryList, + ServerMessage<?> message, + final MessageEnqueueRecord messageEnqueueRecord) { - super(queueEntryList, message); + super(queueEntryList, message, messageEnqueueRecord); } @Override diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java index 3ddcb98b53..a90e3d677e 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.message.MessageInstance; +import org.apache.qpid.server.store.MessageStore; public interface QueueEntry extends MessageInstance, Comparable<QueueEntry> { @@ -35,6 +36,5 @@ public interface QueueEntry extends MessageInstance, Comparable<QueueEntry> QueueEntry getNextValidEntry(); - void setExpiration(long calculatedExpiration); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index c798a1ff88..d17e209034 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -37,6 +37,7 @@ import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.Exchange; +import org.apache.qpid.server.store.MessageEnqueueRecord; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.ServerTransaction; @@ -92,15 +93,20 @@ public abstract class QueueEntryImpl implements QueueEntry private static final AtomicIntegerFieldUpdater<QueueEntryImpl> _deliveryCountUpdater = AtomicIntegerFieldUpdater .newUpdater(QueueEntryImpl.class, "_deliveryCount"); + private final MessageEnqueueRecord _enqueueRecord; + public QueueEntryImpl(QueueEntryList queueEntryList) { - this(queueEntryList,null,Long.MIN_VALUE); + this(queueEntryList,null,Long.MIN_VALUE, null); _state = DELETED_STATE; } - public QueueEntryImpl(QueueEntryList queueEntryList, ServerMessage message, final long entryId) + public QueueEntryImpl(QueueEntryList queueEntryList, + ServerMessage message, + final long entryId, + final MessageEnqueueRecord enqueueRecord) { _queueEntryList = queueEntryList; @@ -108,13 +114,17 @@ public abstract class QueueEntryImpl implements QueueEntry _entryIdUpdater.set(this, entryId); populateInstanceProperties(); + _enqueueRecord = enqueueRecord; } - public QueueEntryImpl(QueueEntryList queueEntryList, ServerMessage message) + public QueueEntryImpl(QueueEntryList queueEntryList, + ServerMessage message, + final MessageEnqueueRecord enqueueRecord) { _queueEntryList = queueEntryList; _message = message == null ? null : message.newReference(queueEntryList.getQueue()); populateInstanceProperties(); + _enqueueRecord = enqueueRecord; } private void populateInstanceProperties() @@ -461,7 +471,7 @@ public abstract class QueueEntryImpl implements QueueEntry enqueues = 0; } - txn.dequeue(currentQueue, getMessage(), new ServerTransaction.Action() + txn.dequeue(getEnqueueRecord(), new ServerTransaction.Action() { public void postCommit() { @@ -625,4 +635,9 @@ public abstract class QueueEntryImpl implements QueueEntry } + @Override + public MessageEnqueueRecord getEnqueueRecord() + { + return _enqueueRecord; + } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java index 938f170bee..5c7f0e3e28 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java @@ -21,12 +21,13 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.store.MessageEnqueueRecord; public interface QueueEntryList { AMQQueue<?> getQueue(); - QueueEntry add(ServerMessage message); + QueueEntry add(ServerMessage message, final MessageEnqueueRecord enqueueRecord); QueueEntry next(QueueEntry node); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntry.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntry.java index 433005a857..f3d91af39c 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntry.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntry.java @@ -20,6 +20,7 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.store.MessageEnqueueRecord; /** * An implementation of QueueEntryImpl to be used in SortedQueueEntryList. @@ -46,9 +47,11 @@ public class SortedQueueEntry extends QueueEntryImpl } public SortedQueueEntry(final SortedQueueEntryList queueEntryList, - final ServerMessage message, final long entryId) + final ServerMessage message, + final long entryId, + final MessageEnqueueRecord messageEnqueueRecord) { - super(queueEntryList, message, entryId); + super(queueEntryList, message, entryId, messageEnqueueRecord); } @Override diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java index ee850455b0..0c8f5418ee 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.SortedQueueEntry.Colour; +import org.apache.qpid.server.store.MessageEnqueueRecord; /** * A sorted implementation of QueueEntryList. @@ -51,7 +52,7 @@ public class SortedQueueEntryList implements QueueEntryList return _queue; } - public SortedQueueEntry add(final ServerMessage message) + public SortedQueueEntry add(final ServerMessage message, final MessageEnqueueRecord enqueueRecord) { synchronized(_lock) { @@ -62,7 +63,7 @@ public class SortedQueueEntryList implements QueueEntryList key = val.toString(); } - final SortedQueueEntry entry = new SortedQueueEntry(this,message, ++_entryId); + final SortedQueueEntry entry = new SortedQueueEntry(this,message, ++_entryId, enqueueRecord); entry.setKey(key); insert(entry); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java index 4b6c7a3fe7..44de152776 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java @@ -25,6 +25,7 @@ import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.ManagedAttributeField; import org.apache.qpid.server.model.ManagedObjectFactoryConstructor; +import org.apache.qpid.server.store.MessageEnqueueRecord; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.virtualhost.VirtualHostImpl; @@ -54,11 +55,12 @@ public class SortedQueueImpl extends OutOfOrderQueue<SortedQueueImpl> implements @Override protected void doEnqueue(final ServerMessage message, - final Action<? super MessageInstance> action) + final Action<? super MessageInstance> action, + MessageEnqueueRecord record) { synchronized (_sortedQueueLock) { - super.doEnqueue(message, action); + super.doEnqueue(message, action, record); } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntry.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntry.java index 8669d26110..ea6f62cadb 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntry.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntry.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.store.MessageEnqueueRecord; public class StandardQueueEntry extends OrderedQueueEntry { @@ -31,14 +32,9 @@ public class StandardQueueEntry extends OrderedQueueEntry public StandardQueueEntry(final StandardQueueEntryList queueEntryList, final ServerMessage message, - final long entryId) + final MessageEnqueueRecord messageEnqueueRecord) { - super(queueEntryList, message, entryId); - } - - public StandardQueueEntry(final StandardQueueEntryList queueEntryList, final ServerMessage message) - { - super(queueEntryList, message); + super(queueEntryList, message, messageEnqueueRecord); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java index 487bac1a43..802e1d72ea 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.store.MessageEnqueueRecord; public class StandardQueueEntryList extends OrderedQueueEntryList { @@ -40,9 +41,10 @@ public class StandardQueueEntryList extends OrderedQueueEntryList } - protected StandardQueueEntry createQueueEntry(ServerMessage<?> message) + protected StandardQueueEntry createQueueEntry(ServerMessage<?> message, + final MessageEnqueueRecord enqueueRecord) { - return new StandardQueueEntry(this, message); + return new StandardQueueEntry(this, message, enqueueRecord); } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java index 96d2058c05..c6e6e41465 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java @@ -35,14 +35,11 @@ import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.UUID; -import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; @@ -488,11 +485,11 @@ public abstract class AbstractJDBCMessageStore implements MessageStore } @Override - public StoredMessage addMessage(StorableMessageMetaData metaData) + public <T extends StorableMessageMetaData> MessageHandle<T> addMessage(T metaData) { checkMessageStoreOpen(); - return new StoredJDBCMessage(getNextMessageId(), metaData); + return new StoredJDBCMessage<T>(getNextMessageId(), metaData); } @@ -670,18 +667,18 @@ public abstract class AbstractJDBCMessageStore implements MessageStore } - private void dequeueMessage(ConnectionWrapper connWrapper, final TransactionLogResource queue, Long messageId) throws StoreException + private void dequeueMessage(ConnectionWrapper connWrapper, final UUID queueId, + Long messageId) throws StoreException { Connection conn = connWrapper.getConnection(); - try { PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_QUEUE_ENTRY); try { - stmt.setString(1, queue.getId().toString()); + stmt.setString(1, queueId.toString()); stmt.setLong(2, messageId); int results = stmt.executeUpdate(); @@ -689,14 +686,13 @@ public abstract class AbstractJDBCMessageStore implements MessageStore if(results != 1) { - throw new StoreException("Unable to find message with id " + messageId + " on queue " + queue.getName() - + " with id " + queue.getId()); + throw new StoreException("Unable to find message with id " + messageId + + " on queue with id " + queueId); } if (getLogger().isDebugEnabled()) { - getLogger().debug("Dequeuing message " + messageId + " on queue " + queue.getName() - + " with id " + queue.getId()); + getLogger().debug("Dequeuing message " + messageId + " on queue with id " + queueId); } } finally @@ -708,8 +704,8 @@ public abstract class AbstractJDBCMessageStore implements MessageStore catch (SQLException e) { getLogger().error("Failed to dequeue: " + e.getMessage(), e); - throw new StoreException("Error deleting enqueued message with id " + messageId + " for queue " + queue.getName() - + " with id " + queue.getId() + " from database", e); + throw new StoreException("Error deleting enqueued message with id " + messageId + " for queue with id " + + queueId + " from database", e); } } @@ -766,7 +762,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore } private List<Runnable> recordXid(ConnectionWrapper connWrapper, long format, byte[] globalId, byte[] branchId, - Transaction.Record[] enqueues, Transaction.Record[] dequeues) throws StoreException + Transaction.EnqueueRecord[] enqueues, Transaction.DequeueRecord[] dequeues) throws StoreException { Connection conn = connWrapper.getConnection(); @@ -788,7 +784,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore } List<Runnable> postActions = new ArrayList<>(); - for(org.apache.qpid.server.store.Transaction.Record enqueue : enqueues) + for(Transaction.EnqueueRecord enqueue : enqueues) { StoredMessage storedMessage = enqueue.getMessage().getStoredMessage(); if(storedMessage instanceof StoredJDBCMessage) @@ -809,7 +805,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore if(enqueues != null) { stmt.setString(4, "E"); - for(Transaction.Record record : enqueues) + for(Transaction.EnqueueRecord record : enqueues) { stmt.setString(5, record.getResource().getId().toString()); stmt.setLong(6, record.getMessage().getMessageNumber()); @@ -820,10 +816,10 @@ public abstract class AbstractJDBCMessageStore implements MessageStore if(dequeues != null) { stmt.setString(4, "D"); - for(Transaction.Record record : dequeues) + for(Transaction.DequeueRecord record : dequeues) { - stmt.setString(5, record.getResource().getId().toString()); - stmt.setLong(6, record.getMessage().getMessageNumber()); + stmt.setString(5, record.getEnqueueRecord().getQueueId().toString()); + stmt.setLong(6, record.getEnqueueRecord().getMessageNumber()); stmt.executeUpdate(); } } @@ -1059,9 +1055,10 @@ public abstract class AbstractJDBCMessageStore implements MessageStore } - private static class RecordImpl implements Transaction.Record, TransactionLogResource, EnqueueableMessage + private static class RecordImpl implements Transaction.EnqueueRecord, Transaction.DequeueRecord, TransactionLogResource, EnqueueableMessage { + private final JDBCEnqueueRecord _record; private long _messageNumber; private UUID _queueId; @@ -1069,6 +1066,13 @@ public abstract class AbstractJDBCMessageStore implements MessageStore { _messageNumber = messageNumber; _queueId = queueId; + _record = new JDBCEnqueueRecord(queueId, messageNumber); + } + + @Override + public MessageEnqueueRecord getEnqueueRecord() + { + return _record; } @Override @@ -1323,7 +1327,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore } @Override - public void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message) + public MessageEnqueueRecord enqueueMessage(TransactionLogResource queue, EnqueueableMessage message) { checkMessageStoreOpen(); @@ -1349,15 +1353,17 @@ public abstract class AbstractJDBCMessageStore implements MessageStore }); } AbstractJDBCMessageStore.this.enqueueMessage(_connWrapper, queue, message.getMessageNumber()); - + return new JDBCEnqueueRecord(queue.getId(), message.getMessageNumber()); } @Override - public void dequeueMessage(TransactionLogResource queue, EnqueueableMessage message) + public void dequeueMessage(final MessageEnqueueRecord enqueueRecord) { checkMessageStoreOpen(); - AbstractJDBCMessageStore.this.dequeueMessage(_connWrapper, queue, message.getMessageNumber()); + AbstractJDBCMessageStore.this.dequeueMessage(_connWrapper, + enqueueRecord.getQueueId(), + enqueueRecord.getMessageNumber()); } @Override @@ -1408,22 +1414,93 @@ public abstract class AbstractJDBCMessageStore implements MessageStore } @Override - public void removeXid(long format, byte[] globalId, byte[] branchId) + public void removeXid(final StoredXidRecord record) { checkMessageStoreOpen(); - AbstractJDBCMessageStore.this.removeXid(_connWrapper, format, globalId, branchId); + AbstractJDBCMessageStore.this.removeXid(_connWrapper, + record.getFormat(), + record.getGlobalId(), + record.getBranchId()); } @Override - public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues) + public StoredXidRecord recordXid(final long format, + final byte[] globalId, + final byte[] branchId, + EnqueueRecord[] enqueues, + DequeueRecord[] dequeues) { checkMessageStoreOpen(); _postCommitActions.addAll(AbstractJDBCMessageStore.this.recordXid(_connWrapper, format, globalId, branchId, enqueues, dequeues)); + return new JDBCStoredXidRecord(format, globalId, branchId); } + + } + private static class JDBCStoredXidRecord implements Transaction.StoredXidRecord + { + private final long _format; + private final byte[] _globalId; + private final byte[] _branchId; + + public JDBCStoredXidRecord(final long format, final byte[] globalId, final byte[] branchId) + { + _format = format; + _globalId = globalId; + _branchId = branchId; + } + + @Override + public long getFormat() + { + return _format; + } + + @Override + public byte[] getGlobalId() + { + return _globalId; + } + + @Override + public byte[] getBranchId() + { + return _branchId; + } + + + @Override + public boolean equals(final Object o) + { + if (this == o) + { + return true; + } + if (o == null || getClass() != o.getClass()) + { + return false; + } + + final JDBCStoredXidRecord that = (JDBCStoredXidRecord) o; + + return _format == that._format + && Arrays.equals(_globalId, that._globalId) + && Arrays.equals(_branchId, that._branchId); + + } + + @Override + public int hashCode() + { + int result = (int) (_format ^ (_format >>> 32)); + result = 31 * result + Arrays.hashCode(_globalId); + result = 31 * result + Arrays.hashCode(_branchId); + return result; + } + } static interface MessageDataRef<T extends StorableMessageMetaData> { @@ -1540,7 +1617,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore } } - private class StoredJDBCMessage<T extends StorableMessageMetaData> implements StoredMessage<T> + private class StoredJDBCMessage<T extends StorableMessageMetaData> implements StoredMessage<T>, MessageHandle<T> { private final long _messageId; @@ -1597,7 +1674,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore } @Override - public void addContent(int offsetInMessage, ByteBuffer src) + public void addContent(ByteBuffer src) { src = src.slice(); byte[] data = _messageDataRef.getData(); @@ -1622,6 +1699,12 @@ public abstract class AbstractJDBCMessageStore implements MessageStore } @Override + public StoredMessage<T> allContentAdded() + { + return this; + } + + @Override public int getContent(int offsetInMessage, ByteBuffer dst) { byte[] data = _messageDataRef.getData(); @@ -1809,286 +1892,302 @@ public abstract class AbstractJDBCMessageStore implements MessageStore } @Override - public StoredMessage<?> getMessage(long messageId) throws StoreException + public MessageStoreReader newMessageStoreReader() { - checkMessageStoreOpen(); + return new JDBCMessageStoreReader(); + } - Connection conn = null; - StoredJDBCMessage message; - try + private class JDBCMessageStoreReader implements MessageStoreReader + { + + @Override + public StoredMessage<?> getMessage(long messageId) throws StoreException { - conn = newAutoCommitConnection(); - try (PreparedStatement stmt = conn.prepareStatement(SELECT_ONE_FROM_META_DATA)) + checkMessageStoreOpen(); + + Connection conn = null; + StoredJDBCMessage message; + try { - stmt.setLong(1, messageId); - try (ResultSet rs = stmt.executeQuery()) + conn = newAutoCommitConnection(); + try (PreparedStatement stmt = conn.prepareStatement(SELECT_ONE_FROM_META_DATA)) { - if (rs.next()) + stmt.setLong(1, messageId); + try (ResultSet rs = stmt.executeQuery()) { - byte[] dataAsBytes = getBlobAsBytes(rs, 2); - ByteBuffer buf = ByteBuffer.wrap(dataAsBytes); - buf.position(1); - buf = buf.slice(); - MessageMetaDataType<?> type = MessageMetaDataTypeRegistry.fromOrdinal(dataAsBytes[0]); - StorableMessageMetaData metaData = type.createMetaData(buf); - message = new StoredJDBCMessage(messageId, metaData, true); + if (rs.next()) + { + byte[] dataAsBytes = getBlobAsBytes(rs, 2); + ByteBuffer buf = ByteBuffer.wrap(dataAsBytes); + buf.position(1); + buf = buf.slice(); + MessageMetaDataType<?> type = MessageMetaDataTypeRegistry.fromOrdinal(dataAsBytes[0]); + StorableMessageMetaData metaData = type.createMetaData(buf); + message = new StoredJDBCMessage(messageId, metaData, true); - } - else - { - message = null; + } + else + { + message = null; + } } } + return message; + } + catch (SQLException e) + { + throw new StoreException("Error encountered when visiting messages", e); + } + finally + { + JdbcUtils.closeConnection(conn, getLogger()); } - return message; - } - catch (SQLException e) - { - throw new StoreException("Error encountered when visiting messages", e); } - finally + + @Override + public void close() { - JdbcUtils.closeConnection(conn, getLogger()); - } - } + } - @Override - public void visitMessages(MessageHandler handler) throws StoreException - { - checkMessageStoreOpen(); - Connection conn = null; - try + @Override + public void visitMessages(MessageHandler handler) throws StoreException { - conn = newAutoCommitConnection(); - Statement stmt = conn.createStatement(); + checkMessageStoreOpen(); + + Connection conn = null; try { - ResultSet rs = stmt.executeQuery(SELECT_ALL_FROM_META_DATA); + conn = newAutoCommitConnection(); + Statement stmt = conn.createStatement(); try { - while (rs.next()) + ResultSet rs = stmt.executeQuery(SELECT_ALL_FROM_META_DATA); + try { - long messageId = rs.getLong(1); - byte[] dataAsBytes = getBlobAsBytes(rs, 2); - ByteBuffer buf = ByteBuffer.wrap(dataAsBytes); - buf.position(1); - buf = buf.slice(); - MessageMetaDataType<?> type = MessageMetaDataTypeRegistry.fromOrdinal(dataAsBytes[0]); - StorableMessageMetaData metaData = type.createMetaData(buf); - StoredJDBCMessage message = new StoredJDBCMessage(messageId, metaData, true); - if (!handler.handle(message)) + while (rs.next()) { - break; + long messageId = rs.getLong(1); + byte[] dataAsBytes = getBlobAsBytes(rs, 2); + ByteBuffer buf = ByteBuffer.wrap(dataAsBytes); + buf.position(1); + buf = buf.slice(); + MessageMetaDataType<?> type = MessageMetaDataTypeRegistry.fromOrdinal(dataAsBytes[0]); + StorableMessageMetaData metaData = type.createMetaData(buf); + StoredJDBCMessage message = new StoredJDBCMessage(messageId, metaData, true); + if (!handler.handle(message)) + { + break; + } } } + finally + { + rs.close(); + } } finally { - rs.close(); + stmt.close(); } } + catch (SQLException e) + { + throw new StoreException("Error encountered when visiting messages", e); + } finally { - stmt.close(); + JdbcUtils.closeConnection(conn, getLogger()); } } - catch (SQLException e) - { - throw new StoreException("Error encountered when visiting messages", e); - } - finally - { - JdbcUtils.closeConnection(conn, getLogger()); - } - } - @Override - public void visitMessageInstances(TransactionLogResource queue, MessageInstanceHandler handler) throws StoreException - { - checkMessageStoreOpen(); - - Connection conn = null; - try + @Override + public void visitMessageInstances(TransactionLogResource queue, MessageInstanceHandler handler) + throws StoreException { - conn = newAutoCommitConnection(); - PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_QUEUE_ENTRY_FOR_QUEUE); + checkMessageStoreOpen(); + + Connection conn = null; try { - stmt.setString(1, queue.getId().toString()); - ResultSet rs = stmt.executeQuery(); + conn = newAutoCommitConnection(); + PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_QUEUE_ENTRY_FOR_QUEUE); try { - while(rs.next()) + stmt.setString(1, queue.getId().toString()); + ResultSet rs = stmt.executeQuery(); + try { - String id = rs.getString(1); - long messageId = rs.getLong(2); - if (!handler.handle(UUID.fromString(id), messageId)) + while (rs.next()) { - break; + String id = rs.getString(1); + long messageId = rs.getLong(2); + if (!handler.handle(new JDBCEnqueueRecord(UUID.fromString(id), messageId))) + { + break; + } } } + finally + { + rs.close(); + } } finally { - rs.close(); + stmt.close(); } } + catch (SQLException e) + { + throw new StoreException("Error encountered when visiting message instances", e); + } finally { - stmt.close(); + JdbcUtils.closeConnection(conn, getLogger()); } - } - catch(SQLException e) - { - throw new StoreException("Error encountered when visiting message instances", e); - } - finally - { - JdbcUtils.closeConnection(conn, getLogger()); - } - - } - @Override - public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException - { - checkMessageStoreOpen(); + } - Connection conn = null; - try + @Override + public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException { - conn = newAutoCommitConnection(); - Statement stmt = conn.createStatement(); + checkMessageStoreOpen(); + + Connection conn = null; try { - ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE_ENTRY); + conn = newAutoCommitConnection(); + Statement stmt = conn.createStatement(); try { - while(rs.next()) + ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE_ENTRY); + try { - String id = rs.getString(1); - long messageId = rs.getLong(2); - if (!handler.handle(UUID.fromString(id), messageId)) + while (rs.next()) { - break; + String id = rs.getString(1); + long messageId = rs.getLong(2); + if (!handler.handle(new JDBCEnqueueRecord(UUID.fromString(id), messageId))) + { + break; + } } } + finally + { + rs.close(); + } } finally { - rs.close(); + stmt.close(); } } + catch (SQLException e) + { + throw new StoreException("Error encountered when visiting message instances", e); + } finally { - stmt.close(); + JdbcUtils.closeConnection(conn, getLogger()); } } - catch(SQLException e) - { - throw new StoreException("Error encountered when visiting message instances", e); - } - finally - { - JdbcUtils.closeConnection(conn, getLogger()); - } - } - - @Override - public void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException - { - checkMessageStoreOpen(); - Connection conn = null; - try + @Override + public void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException { - conn = newAutoCommitConnection(); - List<Xid> xids = new ArrayList<Xid>(); + checkMessageStoreOpen(); - Statement stmt = conn.createStatement(); + Connection conn = null; try { - ResultSet rs = stmt.executeQuery(SELECT_ALL_FROM_XIDS); + conn = newAutoCommitConnection(); + List<Xid> xids = new ArrayList<Xid>(); + + Statement stmt = conn.createStatement(); try { - while(rs.next()) + ResultSet rs = stmt.executeQuery(SELECT_ALL_FROM_XIDS); + try { + while (rs.next()) + { - long format = rs.getLong(1); - byte[] globalId = rs.getBytes(2); - byte[] branchId = rs.getBytes(3); - xids.add(new Xid(format, globalId, branchId)); + long format = rs.getLong(1); + byte[] globalId = rs.getBytes(2); + byte[] branchId = rs.getBytes(3); + xids.add(new Xid(format, globalId, branchId)); + } + } + finally + { + rs.close(); } } finally { - rs.close(); + stmt.close(); } - } - finally - { - stmt.close(); - } - - for(Xid xid : xids) - { - List<RecordImpl> enqueues = new ArrayList<RecordImpl>(); - List<RecordImpl> dequeues = new ArrayList<RecordImpl>(); - - PreparedStatement pstmt = conn.prepareStatement(SELECT_ALL_FROM_XID_ACTIONS); - - try + for (Xid xid : xids) { - pstmt.setLong(1, xid.getFormat()); - pstmt.setBytes(2, xid.getGlobalId()); - pstmt.setBytes(3, xid.getBranchId()); + List<RecordImpl> enqueues = new ArrayList<RecordImpl>(); + List<RecordImpl> dequeues = new ArrayList<RecordImpl>(); + + PreparedStatement pstmt = conn.prepareStatement(SELECT_ALL_FROM_XID_ACTIONS); - ResultSet rs = pstmt.executeQuery(); try { - while(rs.next()) + pstmt.setLong(1, xid.getFormat()); + pstmt.setBytes(2, xid.getGlobalId()); + pstmt.setBytes(3, xid.getBranchId()); + + ResultSet rs = pstmt.executeQuery(); + try { + while (rs.next()) + { - String actionType = rs.getString(1); - UUID queueId = UUID.fromString(rs.getString(2)); - long messageId = rs.getLong(3); + String actionType = rs.getString(1); + UUID queueId = UUID.fromString(rs.getString(2)); + long messageId = rs.getLong(3); - RecordImpl record = new RecordImpl(queueId, messageId); - List<RecordImpl> records = "E".equals(actionType) ? enqueues : dequeues; - records.add(record); + RecordImpl record = new RecordImpl(queueId, messageId); + List<RecordImpl> records = "E".equals(actionType) ? enqueues : dequeues; + records.add(record); + } + } + finally + { + rs.close(); } } finally { - rs.close(); + pstmt.close(); } - } - finally - { - pstmt.close(); - } - if (!handler.handle(xid.getFormat(), xid.getGlobalId(), xid.getBranchId(), - enqueues.toArray(new RecordImpl[enqueues.size()]), - dequeues.toArray(new RecordImpl[dequeues.size()]))) - { - break; + if (!handler.handle(new JDBCStoredXidRecord(xid.getFormat(), xid.getGlobalId(), xid.getBranchId()), + enqueues.toArray(new RecordImpl[enqueues.size()]), + dequeues.toArray(new RecordImpl[dequeues.size()]))) + { + break; + } } - } - } - catch (SQLException e) - { - throw new StoreException("Error encountered when visiting distributed transactions", e); + } + catch (SQLException e) + { + throw new StoreException("Error encountered when visiting distributed transactions", e); - } - finally - { - JdbcUtils.closeConnection(conn, getLogger()); + } + finally + { + JdbcUtils.closeConnection(conn, getLogger()); + } } } @@ -2135,4 +2234,26 @@ public abstract class AbstractJDBCMessageStore implements MessageStore } + private static class JDBCEnqueueRecord implements MessageEnqueueRecord + { + private final UUID _queueId; + private final long _messageNumber; + + public JDBCEnqueueRecord(final UUID queueId, + final long messageNumber) + { + _queueId = queueId; + _messageNumber = messageNumber; + } + + public UUID getQueueId() + { + return _queueId; + } + + public long getMessageNumber() + { + return _messageNumber; + } + } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java index eb887b4ef5..650602e23c 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.store; import java.io.File; +import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -65,7 +66,7 @@ public class MemoryMessageStore implements MessageStore } @Override - public void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message) + public MessageEnqueueRecord enqueueMessage(TransactionLogResource queue, EnqueueableMessage message) { if(message.getStoredMessage() instanceof StoredMemoryMessage) @@ -80,18 +81,24 @@ public class MemoryMessageStore implements MessageStore _localEnqueueMap.put(queue.getId(), messageIds); } messageIds.add(message.getMessageNumber()); + return new MemoryEnqueueRecord(queue.getId(), message.getMessageNumber()); } @Override - public void dequeueMessage(TransactionLogResource queue, EnqueueableMessage message) + public void dequeueMessage(final MessageEnqueueRecord enqueueRecord) { - Set<Long> messageIds = _localDequeueMap.get(queue.getId()); + dequeueMessage(enqueueRecord.getQueueId(), enqueueRecord.getMessageNumber()); + } + + private void dequeueMessage(final UUID queueId, final long messageNumber) + { + Set<Long> messageIds = _localDequeueMap.get(queueId); if (messageIds == null) { messageIds = new HashSet<Long>(); - _localDequeueMap.put(queue.getId(), messageIds); + _localDequeueMap.put(queueId, messageIds); } - messageIds.add(message.getMessageNumber()); + messageIds.add(messageNumber); } @Override @@ -110,36 +117,106 @@ public class MemoryMessageStore implements MessageStore } @Override - public void removeXid(long format, byte[] globalId, byte[] branchId) + public void removeXid(final StoredXidRecord record) { - _localDistributedTransactionsRemoves.add(new Xid(format, globalId, branchId)); + _localDistributedTransactionsRemoves.add(new Xid(record.getFormat(), + record.getGlobalId(), + record.getBranchId())); } @Override - public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues) + public StoredXidRecord recordXid(final long format, + final byte[] globalId, + final byte[] branchId, + EnqueueRecord[] enqueues, + DequeueRecord[] dequeues) { _localDistributedTransactionsRecords.put(new Xid(format, globalId, branchId), new DistributedTransactionRecords(enqueues, dequeues)); + return new MemoryStoredXidRecord(format, globalId, branchId); } + + } + private static class MemoryStoredXidRecord implements Transaction.StoredXidRecord + { + private final long _format; + private final byte[] _globalId; + private final byte[] _branchId; + + public MemoryStoredXidRecord(final long format, final byte[] globalId, final byte[] branchId) + { + _format = format; + _globalId = globalId; + _branchId = branchId; + } + + @Override + public long getFormat() + { + return _format; + } + + @Override + public byte[] getGlobalId() + { + return _globalId; + } + + @Override + public byte[] getBranchId() + { + return _branchId; + } + + + @Override + public boolean equals(final Object o) + { + if (this == o) + { + return true; + } + if (o == null || getClass() != o.getClass()) + { + return false; + } + + final MemoryStoredXidRecord that = (MemoryStoredXidRecord) o; + + return _format == that._format + && Arrays.equals(_globalId, that._globalId) + && Arrays.equals(_branchId, that._branchId); + + } + + @Override + public int hashCode() + { + int result = (int) (_format ^ (_format >>> 32)); + result = 31 * result + Arrays.hashCode(_globalId); + result = 31 * result + Arrays.hashCode(_branchId); + return result; + } + } private static final class DistributedTransactionRecords { - private Transaction.Record[] _enqueues; - private Transaction.Record[] _dequeues; + private Transaction.EnqueueRecord[] _enqueues; + private Transaction.DequeueRecord[] _dequeues; - public DistributedTransactionRecords(Transaction.Record[] enqueues, Transaction.Record[] dequeues) + public DistributedTransactionRecords(Transaction.EnqueueRecord[] enqueues, Transaction.DequeueRecord[] dequeues) { super(); _enqueues = enqueues; _dequeues = dequeues; } - public Transaction.Record[] getEnqueues() + public Transaction.EnqueueRecord[] getEnqueues() { return _enqueues; } - public Transaction.Record[] getDequeues() + public Transaction.DequeueRecord[] getDequeues() { return _dequeues; } @@ -201,7 +278,7 @@ public class MemoryMessageStore implements MessageStore } @Override - public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(final T metaData) + public <T extends StorableMessageMetaData> MessageHandle<T> addMessage(final T metaData) { long id = getNextMessageId(); @@ -273,81 +350,121 @@ public class MemoryMessageStore implements MessageStore } @Override - public void visitMessages(final MessageHandler handler) throws StoreException + public MessageStoreReader newMessageStoreReader() { - for (StoredMemoryMessage message : _messages.values()) - { - if(!handler.handle(message)) - { - break; - } - } + return new MemoryMessageStoreReader(); } - @Override - public StoredMessage<?> getMessage(final long messageId) + + private static class MemoryEnqueueRecord implements MessageEnqueueRecord { - return _messages.get(messageId); + private final UUID _queueId; + private final long _messageNumber; + + public MemoryEnqueueRecord(final UUID queueId, + final long messageNumber) + { + _queueId = queueId; + _messageNumber = messageNumber; + } + + public UUID getQueueId() + { + return _queueId; + } + + public long getMessageNumber() + { + return _messageNumber; + } } - @Override - public void visitMessageInstances(final MessageInstanceHandler handler) throws StoreException + private class MemoryMessageStoreReader implements MessageStoreReader { - synchronized (_transactionLock) + @Override + public StoredMessage<?> getMessage(final long messageId) + { + return _messages.get(messageId); + } + + @Override + public void close() { - for (Map.Entry<UUID, Set<Long>> enqueuedEntry : _messageInstances.entrySet()) + + } + + @Override + public void visitMessageInstances(final MessageInstanceHandler handler) throws StoreException + { + synchronized (_transactionLock) { - UUID resourceId = enqueuedEntry.getKey(); - for (Long messageId : enqueuedEntry.getValue()) + for (Map.Entry<UUID, Set<Long>> enqueuedEntry : _messageInstances.entrySet()) { - if (!handler.handle(resourceId, messageId)) + UUID resourceId = enqueuedEntry.getKey(); + for (Long messageId : enqueuedEntry.getValue()) { - return; + if (!handler.handle(new MemoryEnqueueRecord(resourceId, messageId))) + { + return; + } } } } } - } - @Override - public void visitMessageInstances(TransactionLogResource queue, final MessageInstanceHandler handler) throws StoreException - { - synchronized (_transactionLock) + @Override + public void visitMessageInstances(TransactionLogResource queue, final MessageInstanceHandler handler) throws StoreException { - Set<Long> ids = _messageInstances.get(queue.getId()); - if(ids != null) + synchronized (_transactionLock) { - for (long id : ids) + Set<Long> ids = _messageInstances.get(queue.getId()); + if(ids != null) { - if (!handler.handle(queue.getId(), id)) + for (long id : ids) { - return; - } + if (!handler.handle(new MemoryEnqueueRecord(queue.getId(), id))) + { + return; + } + } } } } - } - @Override - public void visitDistributedTransactions(final DistributedTransactionHandler handler) throws StoreException - { - synchronized (_transactionLock) + @Override + public void visitMessages(final MessageHandler handler) throws StoreException { - for (Map.Entry<Xid, DistributedTransactionRecords> entry : _distributedTransactions.entrySet()) + for (StoredMemoryMessage message : _messages.values()) { - Xid xid = entry.getKey(); - DistributedTransactionRecords records = entry.getValue(); - if (!handler.handle(xid.getFormat(), - xid.getGlobalId(), - xid.getBranchId(), - records.getEnqueues(), - records.getDequeues())) + if(!handler.handle(message)) { break; } } } + + @Override + public void visitDistributedTransactions(final DistributedTransactionHandler handler) throws StoreException + { + synchronized (_transactionLock) + { + for (Map.Entry<Xid, DistributedTransactionRecords> entry : _distributedTransactions.entrySet()) + { + Xid xid = entry.getKey(); + DistributedTransactionRecords records = entry.getValue(); + if (!handler.handle(new MemoryStoredXidRecord(xid.getFormat(), + xid.getGlobalId(), + xid.getBranchId()), + records.getEnqueues(), + records.getDequeues())) + { + break; + } + } + } + } + } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageEnqueueRecord.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageEnqueueRecord.java new file mode 100644 index 0000000000..e48c50b13a --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageEnqueueRecord.java @@ -0,0 +1,29 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import java.util.UUID; + +public interface MessageEnqueueRecord +{ + UUID getQueueId(); + long getMessageNumber(); +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageHandle.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageHandle.java new file mode 100644 index 0000000000..151a819dee --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageHandle.java @@ -0,0 +1,32 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import java.nio.ByteBuffer; + +public interface MessageHandle<M extends StorableMessageMetaData> +{ + + void addContent(ByteBuffer src); + + StoredMessage<M> allContentAdded(); + +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java index 28a330b205..8439f85712 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.store; import java.io.File; +import java.util.UUID; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.store.handler.DistributedTransactionHandler; @@ -59,15 +60,7 @@ public interface MessageStore */ void upgradeStoreStructure() throws StoreException; - void visitMessages(MessageHandler handler) throws StoreException; - - void visitMessageInstances(MessageInstanceHandler handler) throws StoreException; - void visitMessageInstances(TransactionLogResource queue, MessageInstanceHandler handler) throws StoreException; - - void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException; - - <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData); - StoredMessage<?> getMessage(long messageId); + <T extends StorableMessageMetaData> MessageHandle<T> addMessage(T metaData); /** * Is this store capable of persisting the data @@ -84,4 +77,20 @@ public interface MessageStore void closeMessageStore(); void onDelete(ConfiguredObject<?> parent); + + MessageStoreReader newMessageStoreReader(); + + interface MessageStoreReader + { + void visitMessages(MessageHandler handler) throws StoreException; + + void visitMessageInstances(MessageInstanceHandler handler) throws StoreException; + void visitMessageInstances(TransactionLogResource queue, MessageInstanceHandler handler) throws StoreException; + + void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException; + + StoredMessage<?> getMessage(long messageId); + void close(); + } + } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java index 83d61fc3fe..b8ca8b0834 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java @@ -28,7 +28,7 @@ import org.apache.qpid.server.store.handler.DistributedTransactionHandler; import org.apache.qpid.server.store.handler.MessageHandler; import org.apache.qpid.server.store.handler.MessageInstanceHandler; -public abstract class NullMessageStore implements MessageStore, DurableConfigurationStore, MessageStoreProvider +public abstract class NullMessageStore implements MessageStore, DurableConfigurationStore, MessageStoreProvider, MessageStore.MessageStoreReader { private ConfiguredObjectRecord[] _initialRecords; @@ -89,7 +89,7 @@ public abstract class NullMessageStore implements MessageStore, DurableConfigura } @Override - public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData) + public <T extends StorableMessageMetaData> MessageHandle<T> addMessage(T metaData) { return null; } @@ -145,6 +145,7 @@ public abstract class NullMessageStore implements MessageStore, DurableConfigura handler.end(); } + @Override public void visitMessages(MessageHandler handler) throws StoreException { @@ -176,4 +177,16 @@ public abstract class NullMessageStore implements MessageStore, DurableConfigura { return null; } + + @Override + public MessageStoreReader newMessageStoreReader() + { + return this; + } + + @Override + public void close() + { + + } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java index e8402c9268..698cf5b9e6 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java @@ -23,7 +23,9 @@ package org.apache.qpid.server.store; import java.nio.ByteBuffer; -public class StoredMemoryMessage<T extends StorableMessageMetaData> implements StoredMessage<T> +import org.apache.qpid.transport.util.Functions; + +public class StoredMemoryMessage<T extends StorableMessageMetaData> implements StoredMessage<T>, MessageHandle<T> { private final long _messageNumber; private ByteBuffer _content; @@ -40,48 +42,45 @@ public class StoredMemoryMessage<T extends StorableMessageMetaData> implements S return _messageNumber; } - public void addContent(int offsetInMessage, ByteBuffer src) + public void addContent(ByteBuffer src) { if(_content == null) { - if(offsetInMessage == 0) - { - _content = src.slice(); - } - else - { - final int contentSize = _metaData.getContentSize(); - int size = (contentSize < offsetInMessage + src.remaining()) - ? offsetInMessage + src.remaining() - : contentSize; - _content = ByteBuffer.allocate(size); - addContent(offsetInMessage, src); - } + _content = src.slice(); + _content.position(_content.limit()); } else { - if(_content.limit() >= offsetInMessage + src.remaining()) + if(_content.remaining() >= src.remaining()) { - _content.position(offsetInMessage); - _content.put(src); - _content.position(0); + _content.put(src.duplicate()); } else { final int contentSize = _metaData.getContentSize(); - int size = (contentSize < offsetInMessage + src.remaining()) - ? offsetInMessage + src.remaining() + int size = (contentSize < _content.position() + src.remaining()) + ? _content.position() + src.remaining() : contentSize; ByteBuffer oldContent = _content; + oldContent.flip(); _content = ByteBuffer.allocate(size); _content.put(oldContent); - _content.position(0); - addContent(offsetInMessage, src); + _content.put(src.duplicate()); } } } + @Override + public StoredMessage<T> allContentAdded() + { + if(_content != null) + { + _content.flip(); + } + return this; + } + public int getContent(int offset, ByteBuffer dst) { if(_content == null) @@ -142,4 +141,5 @@ public class StoredMemoryMessage<T extends StorableMessageMetaData> implements S { return false; } + } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java index 7561b4a11c..e71061ba9d 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java @@ -28,8 +28,6 @@ public interface StoredMessage<M extends StorableMessageMetaData> public long getMessageNumber(); - void addContent(int offsetInMessage, ByteBuffer src); - int getContent(int offsetInMessage, ByteBuffer dst); ByteBuffer getContent(int offsetInMessage, int size); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java index 007f3ab796..4ea751a663 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java @@ -25,24 +25,17 @@ import org.apache.qpid.server.util.FutureResult; public interface Transaction { + /** * Places a message onto a specified queue, in a given transactional context. * * - * - * @param queue The queue to place the message on. + * @param queue The queue to place the message on. * @param message */ - void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message); - - /** - * Extracts a message from a specified queue, in a given transactional context. - * - * @param queue The queue to place the message on. - * @param message The message to dequeue. - */ - void dequeueMessage(TransactionLogResource queue, EnqueueableMessage message); + MessageEnqueueRecord enqueueMessage(TransactionLogResource queue, EnqueueableMessage message); + void dequeueMessage(MessageEnqueueRecord enqueueRecord); /** * Commits all operations performed within a given transactional context. @@ -63,14 +56,28 @@ public interface Transaction void abortTran(); - public static interface Record + interface EnqueueRecord { TransactionLogResource getResource(); EnqueueableMessage getMessage(); } - void removeXid(long format, byte[] globalId, byte[] branchId); + interface DequeueRecord + { + MessageEnqueueRecord getEnqueueRecord(); + } + + void removeXid(StoredXidRecord record); + + + StoredXidRecord recordXid(long format, byte[] globalId, byte[] branchId, EnqueueRecord[] enqueues, + DequeueRecord[] dequeues); - void recordXid(long format, byte[] globalId, byte[] branchId, Transaction.Record[] enqueues, - Transaction.Record[] dequeues); + interface StoredXidRecord + { + long getFormat(); + byte[] getGlobalId(); + byte[] getBranchId(); + + } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/DistributedTransactionHandler.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/DistributedTransactionHandler.java index 733c93355b..96b2702a20 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/DistributedTransactionHandler.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/DistributedTransactionHandler.java @@ -20,11 +20,13 @@ */ package org.apache.qpid.server.store.handler; -import org.apache.qpid.server.store.Transaction.Record; +import org.apache.qpid.server.store.Transaction; +import org.apache.qpid.server.store.Transaction.DequeueRecord; +import org.apache.qpid.server.store.Transaction.EnqueueRecord; public interface DistributedTransactionHandler { - boolean handle(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues); + boolean handle(Transaction.StoredXidRecord storedXid, EnqueueRecord[] enqueues, DequeueRecord[] dequeues); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/MessageInstanceHandler.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/MessageInstanceHandler.java index 3775ec4fee..ea88468d3e 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/MessageInstanceHandler.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/MessageInstanceHandler.java @@ -20,10 +20,11 @@ */ package org.apache.qpid.server.store.handler; -import java.util.UUID; + +import org.apache.qpid.server.store.MessageEnqueueRecord; public interface MessageInstanceHandler { - boolean handle(UUID queueId, long messageId); + boolean handle(MessageEnqueueRecord record); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java index 97e544d566..8b36ebce1b 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java @@ -28,8 +28,8 @@ import org.slf4j.LoggerFactory; import org.apache.qpid.server.message.EnqueueableMessage; import org.apache.qpid.server.message.MessageInstance; -import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.BaseQueue; +import org.apache.qpid.server.store.MessageEnqueueRecord; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.util.FutureResult; import org.apache.qpid.server.store.Transaction; @@ -88,21 +88,21 @@ public class AsyncAutoCommitTransaction implements ServerTransaction } - public void dequeue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction) + public void dequeue(MessageEnqueueRecord record, Action postTransactionAction) { Transaction txn = null; try { FutureResult future; - if(queue.getMessageDurability().persist(message.isPersistent())) + if(record != null) { if (_logger.isDebugEnabled()) { - _logger.debug("Dequeue of message number " + message.getMessageNumber() + " from transaction log. Queue : " + queue.getName()); + _logger.debug("Dequeue of message number " + record.getMessageNumber() + " from transaction log. Queue : " + record.getQueueId()); } txn = _messageStore.newTransaction(); - txn.dequeueMessage(queue, message); + txn.dequeueMessage(record); future = txn.commitTranAsync(); txn = null; @@ -121,6 +121,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction } + private void addFuture(final FutureResult future, final Action action) { if(action != null) @@ -160,14 +161,13 @@ public class AsyncAutoCommitTransaction implements ServerTransaction { for(MessageInstance entry : queueEntries) { - ServerMessage message = entry.getMessage(); - TransactionLogResource queue = entry.getOwningResource(); + MessageEnqueueRecord record = entry.getEnqueueRecord(); - if(queue.getMessageDurability().persist(message.isPersistent())) + if(record != null) { if (_logger.isDebugEnabled()) { - _logger.debug("Dequeue of message number " + message.getMessageNumber() + " from transaction log. Queue : " + queue.getName()); + _logger.debug("Dequeue of message number " + record.getMessageNumber() + " from transaction log. Queue : " + record.getQueueId()); } if(txn == null) @@ -175,7 +175,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction txn = _messageStore.newTransaction(); } - txn.dequeueMessage(queue, message); + txn.dequeueMessage(record); } } @@ -200,12 +200,13 @@ public class AsyncAutoCommitTransaction implements ServerTransaction } - public void enqueue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction) + public void enqueue(TransactionLogResource queue, EnqueueableMessage message, EnqueueAction postTransactionAction) { Transaction txn = null; try { FutureResult future; + final MessageEnqueueRecord enqueueRecord; if(queue.getMessageDurability().persist(message.isPersistent())) { if (_logger.isDebugEnabled()) @@ -214,30 +215,65 @@ public class AsyncAutoCommitTransaction implements ServerTransaction } txn = _messageStore.newTransaction(); - txn.enqueueMessage(queue, message); + enqueueRecord = txn.enqueueMessage(queue, message); future = txn.commitTranAsync(); txn = null; } else { future = FutureResult.IMMEDIATE_FUTURE; + enqueueRecord = null; } - addEnqueueFuture(future, postTransactionAction, message.isPersistent()); + final EnqueueAction underlying = postTransactionAction; + addEnqueueFuture(future, new Action() + { + @Override + public void postCommit() + { + underlying.postCommit(enqueueRecord); + } + + @Override + public void onRollback() + { + underlying.postCommit(enqueueRecord); + } + }, message.isPersistent()); postTransactionAction = null; - }finally + } + finally { - rollbackIfNecessary(postTransactionAction, txn); + final EnqueueAction underlying = postTransactionAction; + + rollbackIfNecessary(new Action() + { + @Override + public void postCommit() + { + + } + + @Override + public void onRollback() + { + if(underlying != null) + { + underlying.onRollback(); + } + } + }, txn); } } - public void enqueue(List<? extends BaseQueue> queues, EnqueueableMessage message, Action postTransactionAction) + public void enqueue(List<? extends BaseQueue> queues, EnqueueableMessage message, EnqueueAction postTransactionAction) { Transaction txn = null; try { - + final MessageEnqueueRecord[] records = new MessageEnqueueRecord[queues.size()]; + int i = 0; for(BaseQueue queue : queues) { if (queue.getMessageDurability().persist(message.isPersistent())) @@ -250,10 +286,11 @@ public class AsyncAutoCommitTransaction implements ServerTransaction { txn = _messageStore.newTransaction(); } - txn.enqueueMessage(queue, message); + records[i] = txn.enqueueMessage(queue, message); } + i++; } FutureResult future; @@ -266,13 +303,49 @@ public class AsyncAutoCommitTransaction implements ServerTransaction { future = FutureResult.IMMEDIATE_FUTURE; } - addEnqueueFuture(future, postTransactionAction, message.isPersistent()); + final EnqueueAction underlying = postTransactionAction; + addEnqueueFuture(future, new Action() + { + @Override + public void postCommit() + { + if(underlying != null) + { + underlying.postCommit(records); + } + } + + @Override + public void onRollback() + { + underlying.onRollback(); + } + }, message.isPersistent()); postTransactionAction = null; - }finally + } + finally { - rollbackIfNecessary(postTransactionAction, txn); + final EnqueueAction underlying = postTransactionAction; + + rollbackIfNecessary(new Action() + { + @Override + public void postCommit() + { + + } + + @Override + public void onRollback() + { + if(underlying != null) + { + underlying.onRollback(); + } + } + }, txn); } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java index 81db5b0358..9c06c1f273 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java @@ -28,8 +28,8 @@ import org.slf4j.LoggerFactory; import org.apache.qpid.server.message.EnqueueableMessage; import org.apache.qpid.server.message.MessageInstance; -import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.BaseQueue; +import org.apache.qpid.server.store.MessageEnqueueRecord; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.Transaction; import org.apache.qpid.server.store.TransactionLogResource; @@ -73,20 +73,20 @@ public class AutoCommitTransaction implements ServerTransaction immediateAction.postCommit(); } - public void dequeue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction) + public void dequeue(MessageEnqueueRecord record, Action postTransactionAction) { Transaction txn = null; try { - if(queue.getMessageDurability().persist(message.isPersistent())) + if(record != null) { if (_logger.isDebugEnabled()) { - _logger.debug("Dequeue of message number " + message.getMessageNumber() + " from transaction log. Queue : " + queue.getName()); + _logger.debug("Dequeue of message number " + record.getMessageNumber() + " from transaction log. Queue : " + record.getQueueId()); } txn = _messageStore.newTransaction(); - txn.dequeueMessage(queue, message); + txn.dequeueMessage(record); txn.commitTran(); txn = null; } @@ -100,6 +100,7 @@ public class AutoCommitTransaction implements ServerTransaction } + public void dequeue(Collection<MessageInstance> queueEntries, Action postTransactionAction) { Transaction txn = null; @@ -107,14 +108,12 @@ public class AutoCommitTransaction implements ServerTransaction { for(MessageInstance entry : queueEntries) { - ServerMessage message = entry.getMessage(); - TransactionLogResource queue = entry.getOwningResource(); - - if(queue.getMessageDurability().persist(message.isPersistent())) + MessageEnqueueRecord enqueueRecord = entry.getEnqueueRecord(); + if(enqueueRecord != null) { if (_logger.isDebugEnabled()) { - _logger.debug("Dequeue of message number " + message.getMessageNumber() + " from transaction log. Queue : " + queue.getName()); + _logger.debug("Dequeue of message number " + enqueueRecord.getMessageNumber() + " from transaction log. Queue : " + enqueueRecord.getQueueId()); } if(txn == null) @@ -122,7 +121,7 @@ public class AutoCommitTransaction implements ServerTransaction txn = _messageStore.newTransaction(); } - txn.dequeueMessage(queue, message); + txn.dequeueMessage(enqueueRecord); } } @@ -142,11 +141,12 @@ public class AutoCommitTransaction implements ServerTransaction } - public void enqueue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction) + public void enqueue(TransactionLogResource queue, EnqueueableMessage message, EnqueueAction postTransactionAction) { Transaction txn = null; try { + final MessageEnqueueRecord record; if(queue.getMessageDurability().persist(message.isPersistent())) { if (_logger.isDebugEnabled()) @@ -155,27 +155,52 @@ public class AutoCommitTransaction implements ServerTransaction } txn = _messageStore.newTransaction(); - txn.enqueueMessage(queue, message); + record = txn.enqueueMessage(queue, message); txn.commitTran(); txn = null; } - postTransactionAction.postCommit(); + else + { + record = null; + } + if(postTransactionAction != null) + { + postTransactionAction.postCommit(record); + } postTransactionAction = null; } finally { - rollbackIfNecessary(postTransactionAction, txn); + final EnqueueAction underlying = postTransactionAction; + rollbackIfNecessary(new Action() + { + @Override + public void postCommit() + { + + } + + @Override + public void onRollback() + { + if(underlying != null) + { + underlying.onRollback(); + } + } + }, txn); } } - public void enqueue(List<? extends BaseQueue> queues, EnqueueableMessage message, Action postTransactionAction) + public void enqueue(List<? extends BaseQueue> queues, EnqueueableMessage message, EnqueueAction postTransactionAction) { Transaction txn = null; try { - + MessageEnqueueRecord[] enqueueRecords = new MessageEnqueueRecord[queues.size()]; + int i = 0; for(BaseQueue queue : queues) { if (queue.getMessageDurability().persist(message.isPersistent())) @@ -188,11 +213,11 @@ public class AutoCommitTransaction implements ServerTransaction { txn = _messageStore.newTransaction(); } - txn.enqueueMessage(queue, message); + enqueueRecords[i] = txn.enqueueMessage(queue, message); } - + i++; } if (txn != null) { @@ -200,13 +225,34 @@ public class AutoCommitTransaction implements ServerTransaction txn = null; } - postTransactionAction.postCommit(); + if(postTransactionAction != null) + { + postTransactionAction.postCommit(enqueueRecords); + } postTransactionAction = null; - }finally + } + finally { - rollbackIfNecessary(postTransactionAction, txn); + final EnqueueAction underlying = postTransactionAction; + rollbackIfNecessary(new Action() + { + @Override + public void postCommit() + { + + } + + @Override + public void onRollback() + { + if(underlying != null) + { + underlying.onRollback(); + } + } + }, txn); } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java index cb713f671f..28b1cc2117 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java @@ -25,6 +25,7 @@ import org.apache.qpid.server.message.EnqueueableMessage; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.BaseQueue; +import org.apache.qpid.server.store.MessageEnqueueRecord; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.virtualhost.VirtualHostImpl; @@ -74,26 +75,27 @@ public class DistributedTransaction implements ServerTransaction } } - public void dequeue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction) + public void dequeue(MessageEnqueueRecord record, Action postTransactionAction) { if(_branch != null) { - _branch.dequeue(queue, message); + _branch.dequeue(record); _branch.addPostTransactionAction(postTransactionAction); } else { - _autoCommitTransaction.dequeue(queue, message, postTransactionAction); + _autoCommitTransaction.dequeue(record, postTransactionAction); } } + public void dequeue(Collection<MessageInstance> messages, Action postTransactionAction) { if(_branch != null) { for(MessageInstance entry : messages) { - _branch.dequeue(entry.getOwningResource(), entry.getMessage()); + _branch.dequeue(entry.getEnqueueRecord()); } _branch.addPostTransactionAction(postTransactionAction); } @@ -103,12 +105,33 @@ public class DistributedTransaction implements ServerTransaction } } - public void enqueue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction) + public void enqueue(TransactionLogResource queue, EnqueueableMessage message, final EnqueueAction postTransactionAction) { if(_branch != null) { - _branch.enqueue(queue, message); - _branch.addPostTransactionAction(postTransactionAction); + final MessageEnqueueRecord[] enqueueRecords = new MessageEnqueueRecord[1]; + _branch.enqueue(queue, message, new org.apache.qpid.server.util.Action<MessageEnqueueRecord>() + { + @Override + public void performAction(final MessageEnqueueRecord record) + { + enqueueRecords[0] = record; + } + }); + addPostTransactionAction(new Action() + { + @Override + public void postCommit() + { + postTransactionAction.postCommit(enqueueRecords); + } + + @Override + public void onRollback() + { + postTransactionAction.onRollback(); + } + }); } else { @@ -117,15 +140,39 @@ public class DistributedTransaction implements ServerTransaction } public void enqueue(List<? extends BaseQueue> queues, EnqueueableMessage message, - Action postTransactionAction) + final EnqueueAction postTransactionAction) { if(_branch != null) { + final MessageEnqueueRecord[] enqueueRecords = new MessageEnqueueRecord[queues.size()]; + int i = 0; for(BaseQueue queue : queues) { - _branch.enqueue(queue, message); + final int pos = i; + _branch.enqueue(queue, message, new org.apache.qpid.server.util.Action<MessageEnqueueRecord>() + { + @Override + public void performAction(final MessageEnqueueRecord record) + { + enqueueRecords[pos] = record; + } + }); + i++; } - _branch.addPostTransactionAction(postTransactionAction); + addPostTransactionAction(new Action() + { + @Override + public void postCommit() + { + postTransactionAction.postCommit(enqueueRecords); + } + + @Override + public void onRollback() + { + postTransactionAction.onRollback(); + } + }); } else { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java index 15b66a90a5..a3521f410c 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.UUID; import java.util.concurrent.ScheduledFuture; import org.slf4j.Logger; @@ -32,10 +33,14 @@ import org.slf4j.LoggerFactory; import org.apache.qpid.server.message.EnqueueableMessage; import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.store.MessageDurability; +import org.apache.qpid.server.store.MessageEnqueueRecord; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreException; +import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.Transaction; import org.apache.qpid.server.store.TransactionLogResource; +import org.apache.qpid.server.util.Action; import org.apache.qpid.server.virtualhost.VirtualHostImpl; import org.apache.qpid.transport.Xid; @@ -48,14 +53,15 @@ public class DtxBranch private State _state = State.ACTIVE; private long _timeout; private Map<AMQSessionModel, State> _associatedSessions = new HashMap<AMQSessionModel, State>(); - private final List<Record> _enqueueRecords = new ArrayList<Record>(); - private final List<Record> _dequeueRecords = new ArrayList<Record>(); + private final List<EnqueueRecord> _enqueueRecords = new ArrayList<>(); + private final List<DequeueRecord> _dequeueRecords = new ArrayList<>(); private Transaction _transaction; private long _expiration; private VirtualHostImpl _vhost; private ScheduledFuture<?> _timeoutFuture; private MessageStore _store; + private Transaction.StoredXidRecord _storedXidRecord; public enum State @@ -77,6 +83,12 @@ public class DtxBranch _vhost = vhost; } + public DtxBranch(Transaction.StoredXidRecord storedXidRecord, MessageStore store, VirtualHostImpl vhost) + { + this(new Xid(storedXidRecord.getFormat(), storedXidRecord.getGlobalId(), storedXidRecord.getBranchId()), store, vhost); + _storedXidRecord = storedXidRecord; + } + public Xid getXid() { return _xid; @@ -227,11 +239,11 @@ public class DtxBranch } Transaction txn = _store.newTransaction(); - txn.recordXid(_xid.getFormat(), + _storedXidRecord = txn.recordXid(_xid.getFormat(), _xid.getGlobalId(), _xid.getBranchId(), - _enqueueRecords.toArray(new Record[_enqueueRecords.size()]), - _dequeueRecords.toArray(new Record[_dequeueRecords.size()])); + _enqueueRecords.toArray(new EnqueueRecord[_enqueueRecords.size()]), + _dequeueRecords.toArray(new DequeueRecord[_dequeueRecords.size()])); txn.commitTran(); prePrepareTransaction(); @@ -266,7 +278,7 @@ public class DtxBranch // prepare has previously been called Transaction txn = _store.newTransaction(); - txn.removeXid(_xid.getFormat(), _xid.getGlobalId(), _xid.getBranchId()); + txn.removeXid(_storedXidRecord); txn.commitTran(); _transaction.abortTran(); @@ -309,7 +321,7 @@ public class DtxBranch } else { - _transaction.removeXid(_xid.getFormat(), _xid.getGlobalId(), _xid.getBranchId()); + _transaction.removeXid(_storedXidRecord); } _transaction.commitTran(); @@ -324,21 +336,25 @@ public class DtxBranch { _transaction = _store.newTransaction(); - for(Record enqueue : _enqueueRecords) + for(final EnqueueRecord enqueue : _enqueueRecords) { + final MessageEnqueueRecord record; if(enqueue.isDurable()) { - _transaction.enqueueMessage(enqueue.getResource(), enqueue.getMessage()); + record = _transaction.enqueueMessage(enqueue.getResource(), enqueue.getMessage()); + } + else + { + record = null; + } + enqueue.getEnqueueAction().performAction(record); } - for(Record enqueue : _dequeueRecords) + for(DequeueRecord dequeue : _dequeueRecords) { - if(enqueue.isDurable()) - { - _transaction.dequeueMessage(enqueue.getResource(), enqueue.getMessage()); - } + _transaction.dequeueMessage(dequeue.getEnqueueRecord()); } } @@ -349,28 +365,58 @@ public class DtxBranch } - public void dequeue(TransactionLogResource resource, EnqueueableMessage message) + public void dequeue(MessageEnqueueRecord record) { - _dequeueRecords.add(new Record(resource, message)); + if(record != null) + { + _dequeueRecords.add(new DequeueRecord(record)); + } } + public void enqueue(TransactionLogResource queue, + EnqueueableMessage message, + final Action<MessageEnqueueRecord> enqueueAction) + { + _enqueueRecords.add(new EnqueueRecord(queue, message, enqueueAction)); + } - public void enqueue(TransactionLogResource queue, EnqueueableMessage message) + private static class DequeueRecord implements Transaction.DequeueRecord { - _enqueueRecords.add(new Record(queue, message)); + private final MessageEnqueueRecord _enqueueRecord; + + public DequeueRecord(MessageEnqueueRecord enqueueRecord) + { + _enqueueRecord = enqueueRecord; + } + + public MessageEnqueueRecord getEnqueueRecord() + { + return _enqueueRecord; + } + + } - private static final class Record implements Transaction.Record + private static class EnqueueRecord implements Transaction.EnqueueRecord { private final TransactionLogResource _resource; private final EnqueueableMessage _message; - public Record(TransactionLogResource resource, EnqueueableMessage message) + private final Action<MessageEnqueueRecord> _enqueueAction; + + public EnqueueRecord(final TransactionLogResource resource, + final EnqueueableMessage message, + final Action<MessageEnqueueRecord> enqueueAction) { _resource = resource; _message = message; + _enqueueAction = enqueueAction; } + public Action<MessageEnqueueRecord> getEnqueueAction() + { + return _enqueueAction; + } public TransactionLogResource getResource() { return _resource; @@ -385,6 +431,7 @@ public class DtxBranch { return _resource.getMessageDurability().persist(_message.isPersistent()); } + } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java index b800556312..541b60e576 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java @@ -30,8 +30,8 @@ import org.slf4j.LoggerFactory; import org.apache.qpid.server.message.EnqueueableMessage; import org.apache.qpid.server.message.MessageInstance; -import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.BaseQueue; +import org.apache.qpid.server.store.MessageEnqueueRecord; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.util.FutureResult; import org.apache.qpid.server.store.Transaction; @@ -92,23 +92,23 @@ public class LocalTransaction implements ServerTransaction _postTransactionActions.add(postTransactionAction); } - public void dequeue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction) + public void dequeue(MessageEnqueueRecord record, Action postTransactionAction) { sync(); _postTransactionActions.add(postTransactionAction); initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime(); - if(queue.getMessageDurability().persist(message.isPersistent())) + if(record != null) { try { if (_logger.isDebugEnabled()) { - _logger.debug("Dequeue of message number " + message.getMessageNumber() + " from transaction log. Queue : " + queue.getName()); + _logger.debug("Dequeue of message number " + record.getMessageNumber() + " from transaction log. Queue : " + record.getQueueId()); } beginTranIfNecessary(); - _transaction.dequeueMessage(queue, message); + _transaction.dequeueMessage(record); } catch(RuntimeException e) { @@ -127,18 +127,16 @@ public class LocalTransaction implements ServerTransaction { for(MessageInstance entry : queueEntries) { - ServerMessage message = entry.getMessage(); - TransactionLogResource queue = entry.getOwningResource(); - - if(queue.getMessageDurability().persist(message.isPersistent())) + final MessageEnqueueRecord record = entry.getEnqueueRecord(); + if(record != null) { if (_logger.isDebugEnabled()) { - _logger.debug("Dequeue of message number " + message.getMessageNumber() + " from transaction log. Queue : " + queue.getName()); + _logger.debug("Dequeue of message number " + record.getMessageNumber() + " from transaction log. Queue : " + record.getQueueId()); } beginTranIfNecessary(); - _transaction.dequeueMessage(queue, message); + _transaction.dequeueMessage(record); } } @@ -181,10 +179,9 @@ public class LocalTransaction implements ServerTransaction } } - public void enqueue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction) + public void enqueue(TransactionLogResource queue, EnqueueableMessage message, EnqueueAction postTransactionAction) { sync(); - _postTransactionActions.add(postTransactionAction); initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime(); if(queue.getMessageDurability().persist(message.isPersistent())) @@ -197,23 +194,83 @@ public class LocalTransaction implements ServerTransaction } beginTranIfNecessary(); - _transaction.enqueueMessage(queue, message); + final MessageEnqueueRecord record = _transaction.enqueueMessage(queue, message); + if(postTransactionAction != null) + { + final EnqueueAction underlying = postTransactionAction; + + _postTransactionActions.add(new Action() + { + @Override + public void postCommit() + { + underlying.postCommit(record); + } + + @Override + public void onRollback() + { + underlying.onRollback(); + } + }); + } } catch(RuntimeException e) { + if(postTransactionAction != null) + { + final EnqueueAction underlying = postTransactionAction; + + _postTransactionActions.add(new Action() + { + @Override + public void postCommit() + { + + } + + @Override + public void onRollback() + { + underlying.onRollback(); + } + }); + } tidyUpOnError(e); } } + else + { + if(postTransactionAction != null) + { + final EnqueueAction underlying = postTransactionAction; + _postTransactionActions.add(new Action() + { + @Override + public void postCommit() + { + underlying.postCommit((MessageEnqueueRecord)null); + } + + @Override + public void onRollback() + { + underlying.onRollback(); + } + }); + } + } } - public void enqueue(List<? extends BaseQueue> queues, EnqueueableMessage message, Action postTransactionAction) + public void enqueue(List<? extends BaseQueue> queues, EnqueueableMessage message, EnqueueAction postTransactionAction) { sync(); - _postTransactionActions.add(postTransactionAction); initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime(); try { + final MessageEnqueueRecord[] records = new MessageEnqueueRecord[queues.size()]; + int i = 0; for(BaseQueue queue : queues) { if(queue.getMessageDurability().persist(message.isPersistent())) @@ -224,13 +281,53 @@ public class LocalTransaction implements ServerTransaction } beginTranIfNecessary(); - _transaction.enqueueMessage(queue, message); + records[i] = _transaction.enqueueMessage(queue, message); } + i++; + } + if(postTransactionAction != null) + { + final EnqueueAction underlying = postTransactionAction; + + _postTransactionActions.add(new Action() + { + @Override + public void postCommit() + { + underlying.postCommit(records); + } + + @Override + public void onRollback() + { + underlying.onRollback(); + } + }); + postTransactionAction = null; } } catch(RuntimeException e) { + if(postTransactionAction != null) + { + final EnqueueAction underlying = postTransactionAction; + + _postTransactionActions.add(new Action() + { + @Override + public void postCommit() + { + + } + + @Override + public void onRollback() + { + underlying.onRollback(); + } + }); + } tidyUpOnError(e); } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java index cae5fa73bf..5251eea9dd 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java @@ -26,6 +26,7 @@ import java.util.List; import org.apache.qpid.server.message.EnqueueableMessage; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.queue.BaseQueue; +import org.apache.qpid.server.store.MessageEnqueueRecord; import org.apache.qpid.server.store.TransactionLogResource; @@ -55,6 +56,15 @@ public interface ServerTransaction public void onRollback(); } + public static interface EnqueueAction + { + public void postCommit(MessageEnqueueRecord... records); + + public void onRollback(); + } + + + /** * Return the time the current transaction started. * @@ -75,12 +85,12 @@ public interface ServerTransaction */ void addPostTransactionAction(Action postTransactionAction); - /** + /** * Dequeue a message from a queue registering a post transaction action. - * - * A store operation will result only for a persistent message on a durable queue. + * + * A store operation will result only for a if the record is not null. */ - void dequeue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction); + void dequeue(MessageEnqueueRecord record, Action postTransactionAction); /** * Dequeue a message(s) from queue(s) registering a post transaction action. @@ -94,14 +104,14 @@ public interface ServerTransaction * * A store operation will result only for a persistent message on a durable queue. */ - void enqueue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction); + void enqueue(TransactionLogResource queue, EnqueueableMessage message, EnqueueAction postTransactionAction); /** * Enqueue a message(s) to queue(s) registering a post transaction action. * * Store operations will result only for a persistent messages on durable queues. */ - void enqueue(List<? extends BaseQueue> queues, EnqueueableMessage message, Action postTransactionAction); + void enqueue(List<? extends BaseQueue> queues, EnqueueableMessage message, EnqueueAction postTransactionAction); /** * Commit the transaction represented by this object. diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java index e0b0d37cbe..df941d16bf 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java @@ -86,6 +86,7 @@ import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.Event; import org.apache.qpid.server.store.EventListener; import org.apache.qpid.server.store.GenericRecoverer; +import org.apache.qpid.server.store.MessageEnqueueRecord; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.MessageStoreProvider; import org.apache.qpid.server.store.StoreException; @@ -1182,7 +1183,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte } if(acquired) { - txn.dequeue(messageInstance.getOwningResource(), messageInstance.getMessage(), new ServerTransaction.Action() + txn.dequeue(messageInstance.getEnqueueRecord(), new ServerTransaction.Action() { public void postCommit() { @@ -1201,11 +1202,11 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte final ServerMessage message = entry.getMessage(); final AMQQueue toQueue = (AMQQueue)queue; - txn.enqueue(toQueue, message, new ServerTransaction.Action() + txn.enqueue(toQueue, message, new ServerTransaction.EnqueueAction() { - public void postCommit() + public void postCommit(MessageEnqueueRecord... records) { - toQueue.enqueue(message, null); + toQueue.enqueue(message, null, records[0]); } public void onRollback() @@ -1222,12 +1223,12 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte if(entry.acquire()) { txn.enqueue(toQueue, message, - new ServerTransaction.Action() + new ServerTransaction.EnqueueAction() { - public void postCommit() + public void postCommit(MessageEnqueueRecord... records) { - toQueue.enqueue(message, null); + toQueue.enqueue(message, null, records[0]); } public void onRollback() @@ -1235,20 +1236,23 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte entry.release(); } }); - txn.dequeue(entry.getOwningResource(), message, - new ServerTransaction.Action() - { - - public void postCommit() + if(entry instanceof QueueEntry) + { + txn.dequeue(entry.getEnqueueRecord(), + new ServerTransaction.Action() { - entry.delete(); - } - public void onRollback() - { + public void postCommit() + { + entry.delete(); + } - } - }); + public void onRollback() + { + + } + }); + } } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java index a24ddec7df..696a4c3da9 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java @@ -25,7 +25,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.UUID; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -38,12 +37,12 @@ import org.slf4j.LoggerFactory; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.logging.messages.TransactionLogMessages; import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; -import org.apache.qpid.server.message.EnqueueableMessage; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.plugin.MessageMetaDataType; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.store.MessageEnqueueRecord; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.store.StoredMessage; @@ -54,6 +53,7 @@ import org.apache.qpid.server.store.handler.MessageInstanceHandler; import org.apache.qpid.server.txn.DtxBranch; import org.apache.qpid.server.txn.DtxRegistry; import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.server.util.Action; import org.apache.qpid.transport.Xid; import org.apache.qpid.transport.util.Functions; @@ -91,6 +91,7 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer private final AtomicBoolean _recoveryComplete = new AtomicBoolean(); private final Map<Long, MessageReference<? extends ServerMessage<?>>> _recoveredMessages = new HashMap<>(); private final ExecutorService _queueRecoveryExecutor = Executors.newCachedThreadPool(); + private final MessageStore.MessageStoreReader _storeReader; private AtomicBoolean _continueRecovery = new AtomicBoolean(true); private AsynchronousRecoverer(final VirtualHostImpl<?, ?, ?> virtualHost) @@ -98,6 +99,7 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer _virtualHost = virtualHost; _eventLogger = virtualHost.getEventLogger(); _store = virtualHost.getMessageStore(); + _storeReader = _store.newMessageStoreReader(); _logSubject = new MessageStoreLogSubject(virtualHost.getName(), _store.getClass().getSimpleName()); _maxMessageId = _store.getNextMessageId(); @@ -107,7 +109,7 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer public void recover() { - getStore().visitDistributedTransactions(new DistributedTransactionVisitor()); + getStoreReader().visitDistributedTransactions(new DistributedTransactionVisitor()); for(AMQQueue<?> queue : _recoveringQueues) { @@ -125,9 +127,9 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer return _eventLogger; } - public MessageStore getStore() + public MessageStore.MessageStoreReader getStoreReader() { - return _store; + return _storeReader; } public MessageStoreLogSubject getLogSubject() @@ -143,7 +145,7 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer private void recoverQueue(AMQQueue<?> queue) { MessageInstanceVisitor handler = new MessageInstanceVisitor(queue); - _store.visitMessageInstances(queue, handler); + _storeReader.visitMessageInstances(queue, handler); getEventLogger().message(getLogSubject(), TransactionLogMessages.RECOVERED(handler.getRecoveredCount(), queue.getName())); getEventLogger().message(getLogSubject(), TransactionLogMessages.RECOVERY_COMPLETE(queue.getName(), true)); @@ -165,18 +167,18 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer entry.setValue(null); // free up any memory associated with the reference object } final List<StoredMessage<?>> messagesToDelete = new ArrayList<>(); - getStore().visitMessages(new MessageHandler() + getStoreReader().visitMessages(new MessageHandler() { @Override public boolean handle(final StoredMessage<?> storedMessage) { long messageNumber = storedMessage.getMessageNumber(); - if(!_recoveredMessages.containsKey(messageNumber)) + if (!_recoveredMessages.containsKey(messageNumber)) { messagesToDelete.add(storedMessage); } - return _continueRecovery.get() && messageNumber <_maxMessageId-1; + return _continueRecovery.get() && messageNumber < _maxMessageId - 1; } }); for(StoredMessage<?> storedMessage : messagesToDelete) @@ -192,6 +194,7 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer messagesToDelete.clear(); _recoveredMessages.clear(); + _storeReader.close(); } private synchronized ServerMessage<?> getRecoveredMessage(final long messageId) @@ -199,7 +202,7 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer MessageReference<? extends ServerMessage<?>> ref = _recoveredMessages.get(messageId); if (ref == null) { - StoredMessage<?> message = _store.getMessage(messageId); + StoredMessage<?> message = _storeReader.getMessage(messageId); if(message != null) { StorableMessageMetaData metaData = message.getMetaData(); @@ -234,6 +237,7 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer { Thread.currentThread().interrupt(); } + _storeReader.close(); } @@ -241,23 +245,20 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer { - @Override - public boolean handle(long format, - byte[] globalId, - byte[] branchId, - Transaction.Record[] enqueues, - Transaction.Record[] dequeues) + public boolean handle(final Transaction.StoredXidRecord storedXid, + final Transaction.EnqueueRecord[] enqueues, + final Transaction.DequeueRecord[] dequeues) { - Xid id = new Xid(format, globalId, branchId); + Xid id = new Xid(storedXid.getFormat(), storedXid.getGlobalId(), storedXid.getBranchId()); DtxRegistry dtxRegistry = getVirtualHost().getDtxRegistry(); DtxBranch branch = dtxRegistry.getBranch(id); if (branch == null) { - branch = new DtxBranch(id, getStore(), getVirtualHost()); + branch = new DtxBranch(storedXid, _store, getVirtualHost()); dtxRegistry.registerBranch(branch); } - for (Transaction.Record record : enqueues) + for (Transaction.EnqueueRecord record : enqueues) { final AMQQueue<?> queue = getVirtualHost().getQueue(record.getResource().getId()); if (queue != null) @@ -269,22 +270,32 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer { final MessageReference<?> ref = message.newReference(); - branch.enqueue(queue, message); + final MessageEnqueueRecord[] records = new MessageEnqueueRecord[1]; + branch.enqueue(queue, message, new Action<MessageEnqueueRecord>() + { + @Override + public void performAction(final MessageEnqueueRecord record) + { + records[0] = record; + } + }); branch.addPostTransactionAction(new ServerTransaction.Action() { - + @Override public void postCommit() { - queue.enqueue(message, null); + queue.enqueue(message, null, records[0]); ref.release(); } + @Override public void onRollback() { ref.release(); } }); + } else { @@ -306,10 +317,10 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer } } - for (Transaction.Record record : dequeues) + for (Transaction.DequeueRecord record : dequeues) { - final AMQQueue<?> queue = getVirtualHost().getQueue(record.getResource().getId()); + final AMQQueue<?> queue = getVirtualHost().getQueue(record.getEnqueueRecord().getQueueId()); if (queue != null) { @@ -321,7 +332,7 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer recoverQueue(queue); } - final long messageId = record.getMessage().getMessageNumber(); + final long messageId = record.getEnqueueRecord().getMessageNumber(); final ServerMessage<?> message = getRecoveredMessage(messageId); if (message != null) @@ -330,7 +341,7 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer entry.acquire(); - branch.dequeue(queue, message); + branch.dequeue(entry.getEnqueueRecord()); branch.addPostTransactionAction(new ServerTransaction.Action() { @@ -362,8 +373,8 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer StringBuilder xidString = xidAsString(id); getEventLogger().message(getLogSubject(), TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(), - record.getResource() - .getId() + record.getEnqueueRecord() + .getQueueId() .toString())); } @@ -427,8 +438,9 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer } @Override - public boolean handle(final UUID queueId, long messageId) + public boolean handle(final MessageEnqueueRecord record) { + long messageId = record.getMessageNumber(); String queueName = _queue.getName(); if(messageId < _maxMessageId) @@ -442,7 +454,7 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer _logger.debug("On recovery, delivering " + message.getMessageNumber() + " to " + queueName); } - _queue.recover(message); + _queue.recover(message, record); _recoveredCount++; } else @@ -452,8 +464,8 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer + " referenced in log as enqueued in queue " + queueName + " is unknown, entry will be discarded"); - Transaction txn = getStore().newTransaction(); - txn.dequeueMessage(_queue, new DummyMessage(messageId)); + Transaction txn = _store.newTransaction(); + txn.dequeueMessage(record); txn.commitTranAsync(); } return _continueRecovery.get(); @@ -472,31 +484,5 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer } } - private static class DummyMessage implements EnqueueableMessage - { - - private final long _messageId; - - public DummyMessage(long messageId) - { - _messageId = messageId; - } - - public long getMessageNumber() - { - return _messageId; - } - - public boolean isPersistent() - { - return true; - } - - public StoredMessage getStoredMessage() - { - return null; - } - } - } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java index 8bdffeb3c6..64264955fb 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java @@ -33,25 +33,24 @@ import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.logging.messages.MessageStoreMessages; import org.apache.qpid.server.logging.messages.TransactionLogMessages; import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; -import org.apache.qpid.server.message.EnqueueableMessage; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.plugin.MessageMetaDataType; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.store.MessageDurability; +import org.apache.qpid.server.store.MessageEnqueueRecord; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.Transaction; -import org.apache.qpid.server.store.Transaction.Record; -import org.apache.qpid.server.store.TransactionLogResource; +import org.apache.qpid.server.store.Transaction.EnqueueRecord; import org.apache.qpid.server.store.handler.DistributedTransactionHandler; import org.apache.qpid.server.store.handler.MessageHandler; import org.apache.qpid.server.store.handler.MessageInstanceHandler; import org.apache.qpid.server.txn.DtxBranch; import org.apache.qpid.server.txn.DtxRegistry; import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.server.util.Action; import org.apache.qpid.transport.Xid; import org.apache.qpid.transport.util.Functions; @@ -64,6 +63,7 @@ public class SynchronousMessageStoreRecoverer implements MessageStoreRecoverer { EventLogger eventLogger = virtualHost.getEventLogger(); MessageStore store = virtualHost.getMessageStore(); + MessageStore.MessageStoreReader storeReader = store.newMessageStoreReader(); MessageStoreLogSubject logSubject = new MessageStoreLogSubject(virtualHost.getName(), store.getClass().getSimpleName()); Map<String, Integer> queueRecoveries = new TreeMap<>(); @@ -73,10 +73,10 @@ public class SynchronousMessageStoreRecoverer implements MessageStoreRecoverer eventLogger.message(logSubject, MessageStoreMessages.RECOVERY_START()); - store.visitMessages(new MessageVisitor(recoveredMessages, unusedMessages)); + storeReader.visitMessages(new MessageVisitor(recoveredMessages, unusedMessages)); eventLogger.message(logSubject, TransactionLogMessages.RECOVERY_START(null, false)); - store.visitMessageInstances(new MessageInstanceVisitor(virtualHost, store, queueRecoveries, + storeReader.visitMessageInstances(new MessageInstanceVisitor(virtualHost, store, queueRecoveries, recoveredMessages, unusedMessages)); for(Map.Entry<String,Integer> entry : queueRecoveries.entrySet()) { @@ -95,7 +95,7 @@ public class SynchronousMessageStoreRecoverer implements MessageStoreRecoverer } } - store.visitDistributedTransactions(new DistributedTransactionVisitor(virtualHost, store, eventLogger, + storeReader.visitDistributedTransactions(new DistributedTransactionVisitor(virtualHost, store, eventLogger, logSubject, recoveredMessages, unusedMessages)); @@ -174,8 +174,10 @@ public class SynchronousMessageStoreRecoverer implements MessageStoreRecoverer } @Override - public boolean handle(final UUID queueId, long messageId) + public boolean handle(final MessageEnqueueRecord record) { + final UUID queueId = record.getQueueId(); + long messageId = record.getMessageNumber(); AMQQueue<?> queue = _virtualHost.getQueue(queueId); if(queue != null) { @@ -196,7 +198,7 @@ public class SynchronousMessageStoreRecoverer implements MessageStoreRecoverer count = 0; } - queue.recover(message); + queue.recover(message, record); _queueRecoveries.put(queueName, ++count); } @@ -204,7 +206,7 @@ public class SynchronousMessageStoreRecoverer implements MessageStoreRecoverer { _logger.warn("Message id " + messageId + " referenced in log as enqueued in queue " + queueName + " is unknown, entry will be discarded"); Transaction txn = _store.newTransaction(); - txn.dequeueMessage(queue, new DummyMessage(messageId)); + txn.dequeueMessage(record); txn.commitTranAsync(); } } @@ -212,28 +214,7 @@ public class SynchronousMessageStoreRecoverer implements MessageStoreRecoverer { _logger.warn("Message id " + messageId + " in log references queue with id " + queueId + " which is not in the configuration, entry will be discarded"); Transaction txn = _store.newTransaction(); - TransactionLogResource mockQueue = - new TransactionLogResource() - { - @Override - public String getName() - { - return "<<UNKNOWN>>"; - } - - @Override - public UUID getId() - { - return queueId; - } - - @Override - public MessageDurability getMessageDurability() - { - return MessageDurability.DEFAULT; - } - }; - txn.dequeueMessage(mockQueue, new DummyMessage(messageId)); + txn.dequeueMessage(record); txn.commitTranAsync(); } return true; @@ -267,17 +248,19 @@ public class SynchronousMessageStoreRecoverer implements MessageStoreRecoverer } @Override - public boolean handle(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues) + public boolean handle(final Transaction.StoredXidRecord storedXid, + final Transaction.EnqueueRecord[] enqueues, + final Transaction.DequeueRecord[] dequeues) { - Xid id = new Xid(format, globalId, branchId); + Xid id = new Xid(storedXid.getFormat(), storedXid.getGlobalId(), storedXid.getBranchId()); DtxRegistry dtxRegistry = _virtualHost.getDtxRegistry(); DtxBranch branch = dtxRegistry.getBranch(id); if(branch == null) { - branch = new DtxBranch(id, _store, _virtualHost); + branch = new DtxBranch(storedXid, _store, _virtualHost); dtxRegistry.registerBranch(branch); } - for(Transaction.Record record : enqueues) + for(EnqueueRecord record : enqueues) { final AMQQueue<?> queue = _virtualHost.getQueue(record.getResource().getId()); if(queue != null) @@ -289,23 +272,32 @@ public class SynchronousMessageStoreRecoverer implements MessageStoreRecoverer if(message != null) { final MessageReference<?> ref = message.newReference(); + final MessageEnqueueRecord[] records = new MessageEnqueueRecord[1]; - branch.enqueue(queue,message); - + branch.enqueue(queue, message, new Action<MessageEnqueueRecord>() + { + @Override + public void performAction(final MessageEnqueueRecord record) + { + records[0] = record; + } + }); branch.addPostTransactionAction(new ServerTransaction.Action() { - + @Override public void postCommit() { - queue.enqueue(message, null); + queue.enqueue(message, null, records[0]); ref.release(); } + @Override public void onRollback() { ref.release(); } }); + } else { @@ -324,12 +316,12 @@ public class SynchronousMessageStoreRecoverer implements MessageStoreRecoverer } } - for(Transaction.Record record : dequeues) + for(Transaction.DequeueRecord record : dequeues) { - final AMQQueue<?> queue = _virtualHost.getQueue(record.getResource().getId()); + final AMQQueue<?> queue = _virtualHost.getQueue(record.getEnqueueRecord().getQueueId()); if(queue != null) { - final long messageId = record.getMessage().getMessageNumber(); + final long messageId = record.getEnqueueRecord().getMessageNumber(); final ServerMessage<?> message = _recoveredMessages.get(messageId); _unusedMessages.remove(messageId); @@ -339,7 +331,7 @@ public class SynchronousMessageStoreRecoverer implements MessageStoreRecoverer entry.acquire(); - branch.dequeue(queue, message); + branch.dequeue(entry.getEnqueueRecord()); branch.addPostTransactionAction(new ServerTransaction.Action() { @@ -370,7 +362,7 @@ public class SynchronousMessageStoreRecoverer implements MessageStoreRecoverer StringBuilder xidString = xidAsString(id); _eventLogger.message(_logSubject, TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(), - record.getResource().getId().toString())); + record.getEnqueueRecord().getQueueId().toString())); } } @@ -395,30 +387,4 @@ public class SynchronousMessageStoreRecoverer implements MessageStoreRecoverer } - private static class DummyMessage implements EnqueueableMessage - { - - private final long _messageId; - - public DummyMessage(long messageId) - { - _messageId = messageId; - } - - public long getMessageNumber() - { - return _messageId; - } - - public boolean isPersistent() - { - return true; - } - - public StoredMessage getStoredMessage() - { - return null; - } - } - } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java index c775a70cb8..b0cced47f6 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java @@ -503,7 +503,7 @@ public class TopicExchangeTest extends QpidTestCase when(message.getMessageNumber()).thenReturn(messageNumber); for(BaseQueue q : queues) { - q.enqueue(message, null); + q.enqueue(message, null, null); } return queues.size(); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java index 749b58323b..f3fc11257f 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java @@ -179,7 +179,7 @@ abstract class AbstractQueueTestBase extends QpidTestCase _queue.getConsumerCountWithCredit()); // Check sending a message ends up with the subscriber - _queue.enqueue(messageA, null); + _queue.enqueue(messageA, null, null); try { Thread.sleep(2000L); @@ -198,7 +198,7 @@ abstract class AbstractQueueTestBase extends QpidTestCase 1 == _queue.getConsumerCountWithCredit()); ServerMessage messageB = createMessage(new Long (25)); - _queue.enqueue(messageB, null); + _queue.enqueue(messageB, null, null); assertNull(_consumer.getQueueContext()); } @@ -206,7 +206,7 @@ abstract class AbstractQueueTestBase extends QpidTestCase public void testEnqueueMessageThenRegisterConsumer() throws Exception, InterruptedException { ServerMessage messageA = createMessage(new Long(24)); - _queue.enqueue(messageA, null); + _queue.enqueue(messageA, null, null); _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", EnumSet.of(ConsumerImpl.Option.ACQUIRES, ConsumerImpl.Option.SEES_REQUEUES)); @@ -223,8 +223,8 @@ abstract class AbstractQueueTestBase extends QpidTestCase { ServerMessage messageA = createMessage(new Long(24)); ServerMessage messageB = createMessage(new Long(25)); - _queue.enqueue(messageA, null); - _queue.enqueue(messageB, null); + _queue.enqueue(messageA, null, null); + _queue.enqueue(messageB, null, null); _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", EnumSet.of(ConsumerImpl.Option.ACQUIRES, ConsumerImpl.Option.SEES_REQUEUES)); @@ -255,9 +255,9 @@ abstract class AbstractQueueTestBase extends QpidTestCase /* Enqueue three messages */ - _queue.enqueue(messageA, postEnqueueAction); - _queue.enqueue(messageB, postEnqueueAction); - _queue.enqueue(messageC, postEnqueueAction); + _queue.enqueue(messageA, postEnqueueAction, null); + _queue.enqueue(messageB, postEnqueueAction, null); + _queue.enqueue(messageC, postEnqueueAction, null); Thread.sleep(150); // Work done by QueueRunner Thread @@ -306,7 +306,7 @@ abstract class AbstractQueueTestBase extends QpidTestCase final long expiration = System.currentTimeMillis() + messageExpirationOffset; when(messageA.getExpiration()).thenReturn(expiration); - _queue.enqueue(messageA, postEnqueueAction); + _queue.enqueue(messageA, postEnqueueAction, null); int subFlushWaitTime = 150; Thread.sleep(subFlushWaitTime); // Work done by QueueRunner Thread @@ -354,9 +354,9 @@ abstract class AbstractQueueTestBase extends QpidTestCase /* Enqueue three messages */ - _queue.enqueue(messageA, postEnqueueAction); - _queue.enqueue(messageB, postEnqueueAction); - _queue.enqueue(messageC, postEnqueueAction); + _queue.enqueue(messageA, postEnqueueAction, null); + _queue.enqueue(messageB, postEnqueueAction, null); + _queue.enqueue(messageC, postEnqueueAction, null); Thread.sleep(150); // Work done by QueueRunner Thread @@ -412,8 +412,8 @@ abstract class AbstractQueueTestBase extends QpidTestCase /* Enqueue two messages */ - _queue.enqueue(messageA, postEnqueueAction); - _queue.enqueue(messageB, postEnqueueAction); + _queue.enqueue(messageA, postEnqueueAction, null); + _queue.enqueue(messageB, postEnqueueAction, null); Thread.sleep(150); // Work done by QueueRunner Thread @@ -450,7 +450,7 @@ abstract class AbstractQueueTestBase extends QpidTestCase _queue.getConsumerCountWithCredit()); // Check sending a message ends up with the subscriber - _queue.enqueue(messageA, null); + _queue.enqueue(messageA, null, null); try { Thread.sleep(2000L); @@ -517,7 +517,7 @@ abstract class AbstractQueueTestBase extends QpidTestCase _consumer.resend(entry); } - }); + }, null); @@ -530,7 +530,7 @@ abstract class AbstractQueueTestBase extends QpidTestCase ServerMessage message = createMessage(messageId); // Put message on queue - _queue.enqueue(message, null); + _queue.enqueue(message, null, null); // Get message id Long testmsgid = _queue.getMessagesOnTheQueue(1).get(0); @@ -546,7 +546,7 @@ abstract class AbstractQueueTestBase extends QpidTestCase Long messageId = new Long(i); ServerMessage message = createMessage(messageId); // Put message on queue - _queue.enqueue(message, null); + _queue.enqueue(message, null, null); } // Get message ids List<Long> msgids = _queue.getMessagesOnTheQueue(5); @@ -567,7 +567,7 @@ abstract class AbstractQueueTestBase extends QpidTestCase Long messageId = new Long(i); ServerMessage message = createMessage(messageId); // Put message on queue - _queue.enqueue(message, null); + _queue.enqueue(message, null, null); } // Get message ids List<Long> msgids = _queue.getMessagesOnTheQueue(5, 5); @@ -588,7 +588,7 @@ abstract class AbstractQueueTestBase extends QpidTestCase Long messageId = new Long(i); ServerMessage message = createMessage(messageId); // Put message on queue - _queue.enqueue(message, null); + _queue.enqueue(message, null, null); } // Get non-existent 0th QueueEntry & check returned list was empty @@ -763,10 +763,10 @@ abstract class AbstractQueueTestBase extends QpidTestCase _queue.setAttributes(Collections.<String, Object>singletonMap(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, Integer.valueOf(2))); - _queue.enqueue(createMessage(new Long(24)), null); + _queue.enqueue(createMessage(new Long(24)), null, null); verifyZeroInteractions(listener); - _queue.enqueue(createMessage(new Long(25)), null); + _queue.enqueue(createMessage(new Long(25)), null, null); verify(listener, atLeastOnce()).notifyClients(eq(NotificationCheck.MESSAGE_COUNT_ALERT), eq(_queue), contains("Maximum count on queue threshold")); } @@ -775,9 +775,9 @@ abstract class AbstractQueueTestBase extends QpidTestCase { QueueNotificationListener listener = mock(QueueNotificationListener .class); - _queue.enqueue(createMessage(new Long(24)), null); - _queue.enqueue(createMessage(new Long(25)), null); - _queue.enqueue(createMessage(new Long(26)), null); + _queue.enqueue(createMessage(new Long(24)), null, null); + _queue.enqueue(createMessage(new Long(25)), null, null); + _queue.enqueue(createMessage(new Long(26)), null, null); _queue.setNotificationListener(listener); _queue.setAttributes(Collections.<String, Object>singletonMap(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, @@ -861,9 +861,9 @@ abstract class AbstractQueueTestBase extends QpidTestCase public void testOldestMessage() { AMQQueue<?> queue = getQueue(); - queue.enqueue(createMessage(1l, (byte)1, Collections.singletonMap("sortKey", (Object) "Z"), 10l), null); - queue.enqueue(createMessage(2l, (byte)4, Collections.singletonMap("sortKey", (Object) "M"), 100l), null); - queue.enqueue(createMessage(3l, (byte)9, Collections.singletonMap("sortKey", (Object) "A"), 1000l), null); + queue.enqueue(createMessage(1l, (byte)1, Collections.singletonMap("sortKey", (Object) "Z"), 10l), null, null); + queue.enqueue(createMessage(2l, (byte)4, Collections.singletonMap("sortKey", (Object) "M"), 100l), null, null); + queue.enqueue(createMessage(3l, (byte)9, Collections.singletonMap("sortKey", (Object) "A"), 1000l), null, null); assertEquals(10l,queue.getOldestMessageArrivalTime()); } @@ -875,7 +875,7 @@ abstract class AbstractQueueTestBase extends QpidTestCase ServerMessage message = createMessage(1l); when(message.getArrivalTime()).thenReturn(arrivalTime); when(message.getExpiration()).thenReturn(expiration); - queue.enqueue(message,null); + queue.enqueue(message,null, null); queue.visit(new QueueEntryVisitor() { @Override @@ -940,7 +940,7 @@ abstract class AbstractQueueTestBase extends QpidTestCase message = createMessage((long)i); // Put message on queue - queue.enqueue(message,null); + queue.enqueue(message,null, null); } try diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java index faa2d3ffb8..35db9b7ae2 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java @@ -86,7 +86,7 @@ public class LastValueQueueListTest extends TestCase { ServerMessage message = createTestServerMessage(null); - _list.add(message); + _list.add(message, null); int numberOfEntries = countEntries(_list); assertEquals(1, numberOfEntries); } @@ -95,7 +95,7 @@ public class LastValueQueueListTest extends TestCase { ServerMessage message = createTestServerMessage(null); - QueueEntry addedEntry = _list.add(message); + QueueEntry addedEntry = _list.add(message, null); addedEntry.acquire(); addedEntry.delete(); @@ -107,7 +107,7 @@ public class LastValueQueueListTest extends TestCase { ServerMessage message = createTestServerMessage(TEST_KEY_VALUE); - _list.add(message); + _list.add(message, null); int numberOfEntries = countEntries(_list); assertEquals(1, numberOfEntries); } @@ -116,7 +116,7 @@ public class LastValueQueueListTest extends TestCase { ServerMessage message = createTestServerMessage(TEST_KEY_VALUE); - QueueEntry addedEntry = _list.add(message); + QueueEntry addedEntry = _list.add(message, null); addedEntry.acquire(); addedEntry.delete(); @@ -129,8 +129,8 @@ public class LastValueQueueListTest extends TestCase ServerMessage message1 = createTestServerMessage(TEST_KEY_VALUE1); ServerMessage message2 = createTestServerMessage(TEST_KEY_VALUE2); - _list.add(message1); - _list.add(message2); + _list.add(message1, null); + _list.add(message2, null); int numberOfEntries = countEntries(_list); assertEquals(2, numberOfEntries); @@ -141,8 +141,8 @@ public class LastValueQueueListTest extends TestCase ServerMessage message1 = createTestServerMessage(TEST_KEY_VALUE); ServerMessage message2 = createTestServerMessage(TEST_KEY_VALUE); - _list.add(message1); - _list.add(message2); + _list.add(message1, null); + _list.add(message2, null); int numberOfEntries = countEntries(_list); assertEquals(1, numberOfEntries); @@ -153,10 +153,10 @@ public class LastValueQueueListTest extends TestCase ServerMessage message1 = createTestServerMessage(TEST_KEY_VALUE); ServerMessage message2 = createTestServerMessage(TEST_KEY_VALUE); - QueueEntry entry1 = _list.add(message1); + QueueEntry entry1 = _list.add(message1, null); entry1.acquire(); // simulate an in-progress delivery to consumer - _list.add(message2); + _list.add(message2, null); assertFalse(entry1.isDeleted()); assertEquals(2, countEntries(_list)); @@ -173,7 +173,7 @@ public class LastValueQueueListTest extends TestCase ServerMessage message = createTestServerMessage(TEST_KEY_VALUE); - QueueEntry addedEntry = _list.add(message); + QueueEntry addedEntry = _list.add(message, null); assertEquals(1, countEntries(_list)); assertEquals(1, _list.getLatestValuesMap().size()); @@ -193,8 +193,8 @@ public class LastValueQueueListTest extends TestCase ServerMessage message1 = createTestServerMessage(TEST_KEY_VALUE1); ServerMessage message2 = createTestServerMessage(TEST_KEY_VALUE2); - QueueEntry addedEntry1 = _list.add(message1); - QueueEntry addedEntry2 = _list.add(message2); + QueueEntry addedEntry1 = _list.add(message1, null); + QueueEntry addedEntry2 = _list.add(message2, null); assertEquals(2, countEntries(_list)); assertEquals(2, _list.getLatestValuesMap().size()); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueTest.java index fc04af416f..1d8c9f1626 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueTest.java @@ -43,11 +43,11 @@ public class LastValueQueueTest extends AbstractQueueTestBase public void testOldestMessage() { AMQQueue<?> queue = getQueue(); - queue.enqueue(createMessage(1l, (byte)1, Collections.singletonMap("lvqKey", (Object) "Z"), 10l), null); + queue.enqueue(createMessage(1l, (byte)1, Collections.singletonMap("lvqKey", (Object) "Z"), 10l), null, null); assertEquals(10l,queue.getOldestMessageArrivalTime()); - queue.enqueue(createMessage(2l, (byte)4, Collections.singletonMap("lvqKey", (Object) "M"), 100l), null); + queue.enqueue(createMessage(2l, (byte)4, Collections.singletonMap("lvqKey", (Object) "M"), 100l), null, null); assertEquals(10l,queue.getOldestMessageArrivalTime()); - queue.enqueue(createMessage(3l, (byte)9, Collections.singletonMap("lvqKey", (Object) "Z"), 1000l), null); + queue.enqueue(createMessage(3l, (byte)9, Collections.singletonMap("lvqKey", (Object) "Z"), 1000l), null, null); assertEquals(100l,queue.getOldestMessageArrivalTime()); } } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java index 08c0de95d5..b704a590e4 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java @@ -26,6 +26,7 @@ import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.store.MessageEnqueueRecord; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.Action; @@ -66,6 +67,12 @@ public class MockMessageInstance implements MessageInstance } @Override + public MessageEnqueueRecord getEnqueueRecord() + { + return null; + } + + @Override public boolean isAcquiredBy(final ConsumerImpl consumer) { return false; diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java index 49e333c364..ec5f912a87 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java @@ -88,7 +88,7 @@ public class PriorityQueueListTest extends QpidTestCase when(ref.getMessage()).thenReturn(message); when(header.getPriority()).thenReturn(PRIORITIES[i]); - entries[i] = _list.add(message); + entries[i] = _list.add(message, null); } _priority4message1 = entries[0]; diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java index fdc0411f95..57829664b7 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java @@ -48,19 +48,19 @@ public class PriorityQueueTest extends AbstractQueueTestBase // Enqueue messages in order AbstractQueue queue = (AbstractQueue) getQueue(); - queue.enqueue(createMessage(1L, (byte) 10), null); - queue.enqueue(createMessage(2L, (byte) 4), null); - queue.enqueue(createMessage(3L, (byte) 0), null); + queue.enqueue(createMessage(1L, (byte) 10), null, null); + queue.enqueue(createMessage(2L, (byte) 4), null, null); + queue.enqueue(createMessage(3L, (byte) 0), null, null); // Enqueue messages in reverse order - queue.enqueue(createMessage(4L, (byte) 0), null); - queue.enqueue(createMessage(5L, (byte) 4), null); - queue.enqueue(createMessage(6L, (byte) 10), null); + queue.enqueue(createMessage(4L, (byte) 0), null, null); + queue.enqueue(createMessage(5L, (byte) 4), null, null); + queue.enqueue(createMessage(6L, (byte) 10), null, null); // Enqueue messages out of order - queue.enqueue(createMessage(7L, (byte) 4), null); - queue.enqueue(createMessage(8L, (byte) 10), null); - queue.enqueue(createMessage(9L, (byte) 0), null); + queue.enqueue(createMessage(7L, (byte) 4), null, null); + queue.enqueue(createMessage(8L, (byte) 10), null, null); + queue.enqueue(createMessage(9L, (byte) 0), null, null); // Register subscriber queue.addConsumer(getConsumer(), null, null, "test", EnumSet.noneOf(ConsumerImpl.Option.class)); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java index 2eb083bdb3..fe87a2acf1 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java @@ -263,7 +263,7 @@ public abstract class QueueEntryImplTestBase extends TestCase when(reference.getMessage()).thenReturn(message); when(message.newReference()).thenReturn(reference); when(message.newReference(any(TransactionLogResource.class))).thenReturn(reference); - QueueEntryImpl entry = (QueueEntryImpl) queueEntryList.add(message); + QueueEntryImpl entry = (QueueEntryImpl) queueEntryList.add(message, null); entries[i] = entry; } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java index 7f5ea06dcf..43d140f277 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java @@ -56,7 +56,7 @@ public abstract class QueueEntryListTestBase extends TestCase public void testAddSpecificMessage() { final QueueEntryList list = getTestList(); - list.add(getTestMessageToAdd()); + list.add(getTestMessageToAdd(), null); final QueueEntryIterator iter = list.iterator(); int count = 0; @@ -77,7 +77,7 @@ public abstract class QueueEntryListTestBase extends TestCase { final QueueEntryList list = getTestList(); final ServerMessage message = createServerMessage(666l); - list.add(message); + list.add(message, null); final QueueEntryIterator iter = list.iterator(); int count = 0; @@ -220,8 +220,8 @@ public abstract class QueueEntryListTestBase extends TestCase QueueEntryList list = getTestList(true); int i = 0; - QueueEntry queueEntry1 = list.add(createServerMessage(i++)); - QueueEntry queueEntry2 = list.add(createServerMessage(i++)); + QueueEntry queueEntry1 = list.add(createServerMessage(i++), null); + QueueEntry queueEntry2 = list.add(createServerMessage(i++), null); assertSame(queueEntry2, list.next(queueEntry1)); assertNull(list.next(queueEntry2)); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java index 79d7628a9c..1d731f5cfd 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java @@ -28,11 +28,13 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.UUID; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.store.MessageEnqueueRecord; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.util.Action; @@ -60,13 +62,13 @@ public class QueueMessageRecoveryTest extends QpidTestCase queue.open(); - queue.recover(createMockMessage(0)); - queue.enqueue(createMockMessage(4), null); - queue.enqueue(createMockMessage(5), null); - queue.recover(createMockMessage(1)); - queue.recover(createMockMessage(2)); - queue.enqueue(createMockMessage(6), null); - queue.recover(createMockMessage(3)); + queue.recover(createMockMessage(0), createEnqueueRecord(0, queue)); + queue.enqueue(createMockMessage(4), null, null); + queue.enqueue(createMockMessage(5), null, null); + queue.recover(createMockMessage(1), createEnqueueRecord(1, queue)); + queue.recover(createMockMessage(2), createEnqueueRecord(2, queue)); + queue.enqueue(createMockMessage(6), null, null); + queue.recover(createMockMessage(3), createEnqueueRecord(3, queue)); assertEquals(4, messageList.size()); for(int i = 0; i < 4; i++) @@ -76,7 +78,7 @@ public class QueueMessageRecoveryTest extends QpidTestCase queue.completeRecovery(); - queue.enqueue(createMockMessage(7), null); + queue.enqueue(createMockMessage(7), null, null); assertEquals(8, messageList.size()); @@ -123,7 +125,7 @@ public class QueueMessageRecoveryTest extends QpidTestCase { for(int i = 0; i < size; i++) { - queue.recover(createMockMessage(i)); + queue.recover(createMockMessage(i), createEnqueueRecord(i, queue)); } queue.completeRecovery(); } @@ -137,7 +139,7 @@ public class QueueMessageRecoveryTest extends QpidTestCase { for(int i = 0; i < size; i++) { - queue.enqueue(createMockMessage(size + i), null); + queue.enqueue(createMockMessage(size + i), null, null); } } }, "publishing thread"); @@ -156,6 +158,24 @@ public class QueueMessageRecoveryTest extends QpidTestCase } } + private MessageEnqueueRecord createEnqueueRecord(final int messageNumber, final TestQueue queue) + { + return new MessageEnqueueRecord() + { + @Override + public UUID getQueueId() + { + return queue.getId(); + } + + @Override + public long getMessageNumber() + { + return messageNumber; + } + }; + } + private ServerMessage createMockMessage(final long i) { @@ -189,7 +209,7 @@ public class QueueMessageRecoveryTest extends QpidTestCase } @Override - protected void doEnqueue(final ServerMessage message, final Action<? super MessageInstance> action) + protected void doEnqueue(final ServerMessage message, final Action<? super MessageInstance> action, MessageEnqueueRecord record) { synchronized(_messageList) { diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java index ffd91ba093..95788757b5 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java @@ -23,6 +23,7 @@ import org.junit.Assert; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.SortedQueueEntry.Colour; +import org.apache.qpid.server.store.MessageEnqueueRecord; /** * Test extension of SortedQueueEntryList that provides data structure validation tests. @@ -42,10 +43,10 @@ public class SelfValidatingSortedQueueEntryList extends SortedQueueEntryList } @Override /** Overridden to automatically check queue properties before and after. */ - public SortedQueueEntry add(final ServerMessage message) + public SortedQueueEntry add(final ServerMessage message, final MessageEnqueueRecord enqueueRecord) { assertQueueProperties(); //before add - final SortedQueueEntry result = super.add(message); + final SortedQueueEntry result = super.add(message, enqueueRecord); assertQueueProperties(); //after add return result; } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java index f7e9768083..a11b0acb24 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java @@ -76,7 +76,7 @@ public class SimpleQueueEntryImplTest extends QueueEntryImplTestBase when(reference.getMessage()).thenReturn(message); when(message.newReference()).thenReturn(reference); when(message.newReference(any(TransactionLogResource.class))).thenReturn(reference); - return (QueueEntryImpl) queueEntryList.add(message); + return (QueueEntryImpl) queueEntryList.add(message, null); } public void testCompareTo() diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java index 8dbe22f5b6..06b0ef51b9 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java @@ -129,7 +129,7 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase for(final String key : keys) { final ServerMessage msg = generateTestMessage(messageId++, key); - _sqel.add(msg); + _sqel.add(msg, null); } } @@ -224,7 +224,7 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase while(messageId < 200) { final ServerMessage msg = generateTestMessage(messageId++, "samekey"); - _sqel.add(msg); + _sqel.add(msg, null); } final QueueEntryIterator iter = getTestList().iterator(); @@ -245,7 +245,7 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase while(messageId < 200) { final ServerMessage msg = generateTestMessage(messageId++, null); - _sqel.add(msg); + _sqel.add(msg, null); } final QueueEntryIterator iter = getTestList().iterator(); @@ -267,7 +267,7 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase { final ServerMessage msg = generateTestMessage(messageId, textKey); messageId++; - _sqel.add(msg); + _sqel.add(msg, null); } final QueueEntryIterator iter = getTestList().iterator(); @@ -290,7 +290,7 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase { final ServerMessage msg = generateTestMessage(messageId, textkeys[i]); messageId++; - _sqel.add(msg); + _sqel.add(msg, null); } final QueueEntryIterator iter = getTestList().iterator(); @@ -308,13 +308,13 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase _sqel = new SelfValidatingSortedQueueEntryList(_testQueue); ServerMessage msg = generateTestMessage(1, "A"); - _sqel.add(msg); + _sqel.add(msg, null); SortedQueueEntry entry = _sqel.next(_sqel.getHead()); validateEntry(entry, "A", 1); msg = generateTestMessage(2, "B"); - _sqel.add(msg); + _sqel.add(msg, null); entry = _sqel.next(_sqel.getHead()); validateEntry(entry, "A", 1); @@ -328,13 +328,13 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase _sqel = new SelfValidatingSortedQueueEntryList(_testQueue); ServerMessage msg = generateTestMessage(1, "B"); - _sqel.add(msg); + _sqel.add(msg, null); SortedQueueEntry entry = _sqel.next(_sqel.getHead()); validateEntry(entry, "B", 1); msg = generateTestMessage(2, "A"); - _sqel.add(msg); + _sqel.add(msg, null); entry = _sqel.next(_sqel.getHead()); validateEntry(entry, "A", 2); @@ -348,12 +348,12 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase _sqel = new SelfValidatingSortedQueueEntryList(_testQueue); ServerMessage msg = generateTestMessage(1, "A"); - _sqel.add(msg); + _sqel.add(msg, null); SortedQueueEntry entry = _sqel.next(_sqel.getHead()); validateEntry(entry, "A", 1); msg = generateTestMessage(2, "C"); - _sqel.add(msg); + _sqel.add(msg, null); entry = _sqel.next(_sqel.getHead()); validateEntry(entry, "A", 1); @@ -362,7 +362,7 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase validateEntry(entry, "C", 2); msg = generateTestMessage(3, "B"); - _sqel.add(msg); + _sqel.add(msg, null); entry = _sqel.next(_sqel.getHead()); validateEntry(entry, "A", 1); @@ -379,13 +379,13 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase _sqel = new SelfValidatingSortedQueueEntryList(_testQueue); ServerMessage msg = generateTestMessage(1, "B"); - _sqel.add(msg); + _sqel.add(msg, null); SortedQueueEntry entry = _sqel.next(_sqel.getHead()); validateEntry(entry, "B", 1); msg = generateTestMessage(2, "D"); - _sqel.add(msg); + _sqel.add(msg, null); entry = _sqel.next(_sqel.getHead()); validateEntry(entry, "B", 1); @@ -394,7 +394,7 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase validateEntry(entry, "D", 2); msg = generateTestMessage(3, "C"); - _sqel.add(msg); + _sqel.add(msg, null); entry = _sqel.next(_sqel.getHead()); validateEntry(entry, "B", 1); @@ -406,7 +406,7 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase validateEntry(entry, "D", 2); msg = generateTestMessage(4, "A"); - _sqel.add(msg); + _sqel.add(msg, null); entry = _sqel.next(_sqel.getHead()); validateEntry(entry, "A", 4); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java index 0b72af17bc..785b437375 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java @@ -103,7 +103,7 @@ public class SortedQueueEntryTest extends QueueEntryImplTestBase when(reference.getMessage()).thenReturn(message); when(message.newReference()).thenReturn(reference); when(message.newReference(any(TransactionLogResource.class))).thenReturn(reference); - return _queueEntryList.add(message); + return _queueEntryList.add(message, null); } public void testCompareTo() diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java index c592622877..444cdc09fa 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java @@ -81,7 +81,7 @@ public class StandardQueueEntryListTest extends QueueEntryListTestBase when(message.newReference()).thenReturn(ref); when(message.newReference(any(TransactionLogResource.class))).thenReturn(ref); - final QueueEntry bleh = _sqel.add(message); + final QueueEntry bleh = _sqel.add(message, null); assertNotNull("QE should not have been null", bleh); } } @@ -173,7 +173,7 @@ public class StandardQueueEntryListTest extends QueueEntryListTestBase when(ref.getMessage()).thenReturn(message); when(message.newReference()).thenReturn(ref); when(message.newReference(any(TransactionLogResource.class))).thenReturn(ref); - QueueEntry bleh = sqel.add(message); + QueueEntry bleh = sqel.add(message, null); assertNotNull("QE should not have been null", bleh); entriesMap.put(i,bleh); } @@ -276,7 +276,7 @@ public class StandardQueueEntryListTest extends QueueEntryListTestBase when(reference.getMessage()).thenReturn(message); when(message.newReference()).thenReturn(reference); when(message.newReference(any(TransactionLogResource.class))).thenReturn(reference); - entries[i] = (OrderedQueueEntry) queueEntryList.add(message); + entries[i] = (OrderedQueueEntry) queueEntryList.add(message, null); } // test getNext for not acquired entries diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java index 004096082a..268d11c9c5 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java @@ -36,6 +36,7 @@ import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.store.MessageEnqueueRecord; import org.apache.qpid.server.virtualhost.VirtualHostImpl; public class StandardQueueTest extends AbstractQueueTestBase @@ -59,7 +60,7 @@ public class StandardQueueTest extends AbstractQueueTestBase EnumSet.of(ConsumerImpl.Option.ACQUIRES, ConsumerImpl.Option.SEES_REQUEUES)); - getQueue().enqueue(message, null); + getQueue().enqueue(message, null, null); consumer.close(); assertTrue("Queue was not deleted when consumer was removed", getQueue().isDeleted()); @@ -290,7 +291,8 @@ public class StandardQueueTest extends AbstractQueueTestBase * Entries with even message id are considered * dequeued! */ - protected DequeuedQueueEntry createQueueEntry(final ServerMessage message) + protected DequeuedQueueEntry createQueueEntry(final ServerMessage message, + final MessageEnqueueRecord enqueueRecord) { return new DequeuedQueueEntry(this, message); } @@ -311,7 +313,7 @@ public class StandardQueueTest extends AbstractQueueTestBase public DequeuedQueueEntry(DequeuedQueueEntryList list, final ServerMessage message) { - super(list, message); + super(list, message, null); _message = message; } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java index f4b25665ba..0586f8dcce 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java @@ -114,9 +114,10 @@ public abstract class MessageStoreQuotaEventsTestBase extends QpidTestCase imple protected EnqueueableMessage addMessage(long id) { StorableMessageMetaData metaData = createMetaData(id, MESSAGE_DATA.length); - StoredMessage<?> handle = _store.addMessage(metaData); - handle.addContent(0, ByteBuffer.wrap(MESSAGE_DATA)); - TestMessage message = new TestMessage(id, handle); + MessageHandle<?> handle = _store.addMessage(metaData); + handle.addContent(ByteBuffer.wrap(MESSAGE_DATA)); + StoredMessage<? extends StorableMessageMetaData> storedMessage = handle.allContentAdded(); + TestMessage message = new TestMessage(id, storedMessage); return message; } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java index 758799b81f..82c03c7545 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.store; import static org.mockito.Matchers.any; import static org.mockito.Matchers.argThat; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -40,7 +41,7 @@ import org.apache.qpid.server.message.EnqueueableMessage; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.model.VirtualHost; -import org.apache.qpid.server.store.Transaction.Record; +import org.apache.qpid.server.store.Transaction.EnqueueRecord; import org.apache.qpid.server.store.handler.DistributedTransactionHandler; import org.apache.qpid.server.store.handler.MessageHandler; import org.apache.qpid.server.store.handler.MessageInstanceHandler; @@ -50,6 +51,7 @@ public abstract class MessageStoreTestCase extends QpidTestCase { private MessageStore _store; private ConfiguredObject<?> _parent; + private MessageStore.MessageStoreReader _storeReader; public void setUp() throws Exception { @@ -60,6 +62,7 @@ public abstract class MessageStoreTestCase extends QpidTestCase _store = createMessageStore(); _store.openMessageStore(_parent); + _storeReader = _store.newMessageStoreReader(); } protected abstract VirtualHost createVirtualHost(); @@ -73,52 +76,56 @@ public abstract class MessageStoreTestCase extends QpidTestCase protected void reopenStore() throws Exception { + _storeReader.close(); _store.closeMessageStore(); _store = createMessageStore(); _store.openMessageStore(_parent); + _storeReader = _store.newMessageStoreReader(); + } public void testAddAndRemoveRecordXid() throws Exception { long format = 1l; - Record enqueueRecord = getTestRecord(1); - Record dequeueRecord = getTestRecord(2); - Record[] enqueues = { enqueueRecord }; - Record[] dequeues = { dequeueRecord }; + EnqueueRecord enqueueRecord = getTestRecord(1); + TestRecord dequeueRecord = getTestRecord(2); + EnqueueRecord[] enqueues = { enqueueRecord }; + TestRecord[] dequeues = { dequeueRecord }; byte[] globalId = new byte[] { 1 }; byte[] branchId = new byte[] { 2 }; Transaction transaction = _store.newTransaction(); - transaction.recordXid(format, globalId, branchId, enqueues, dequeues); + final Transaction.StoredXidRecord record = + transaction.recordXid(format, globalId, branchId, enqueues, dequeues); transaction.commitTran(); reopenStore(); DistributedTransactionHandler handler = mock(DistributedTransactionHandler.class); - _store.visitDistributedTransactions(handler); - verify(handler, times(1)).handle(format,globalId, branchId, enqueues, dequeues); + _storeReader.visitDistributedTransactions(handler); + verify(handler, times(1)).handle(eq(record), argThat(new RecordMatcher(enqueues)), argThat(new DequeueRecordMatcher(dequeues))); transaction = _store.newTransaction(); - transaction.removeXid(1l, globalId, branchId); + transaction.removeXid(record); transaction.commitTran(); reopenStore(); handler = mock(DistributedTransactionHandler.class); - _store.visitDistributedTransactions(handler); - verify(handler, never()).handle(format,globalId, branchId, enqueues, dequeues); + _storeReader.visitDistributedTransactions(handler); + verify(handler, never()).handle(eq(record), argThat(new RecordMatcher(enqueues)), argThat(new DequeueRecordMatcher(dequeues))); } public void testVisitMessages() throws Exception { long messageId = 1; int contentSize = 0; - final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(messageId, contentSize)); - enqueueMessage(message, "dummyQ"); + final MessageHandle<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(messageId, contentSize)); + enqueueMessage(message.allContentAdded(), "dummyQ"); MessageHandler handler = mock(MessageHandler.class); - _store.visitMessages(handler); + _storeReader.visitMessages(handler); verify(handler, times(1)).handle(argThat(new MessageMetaDataMatcher(messageId))); @@ -176,14 +183,14 @@ public abstract class MessageStoreTestCase extends QpidTestCase int contentSize = 0; for (int i = 0; i < 3; i++) { - final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(i + 1, contentSize)); - enqueueMessage(message, "dummyQ"); + final MessageHandle<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(i + 1, contentSize)); + enqueueMessage(message.allContentAdded(), "dummyQ"); } MessageHandler handler = mock(MessageHandler.class); when(handler.handle(any(StoredMessage.class))).thenReturn(true, false); - _store.visitMessages(handler); + _storeReader.visitMessages(handler); verify(handler, times(2)).handle(any(StoredMessage.class)); } @@ -193,14 +200,14 @@ public abstract class MessageStoreTestCase extends QpidTestCase int contentSize = 0; for (int i = 0; i < 3; i++) { - final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(i + 1, contentSize)); + final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(i + 1, contentSize)).allContentAdded(); enqueueMessage(message, "dummyQ"); } reopenStore(); - final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(4, contentSize)); + final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(4, contentSize)).allContentAdded(); enqueueMessage(message, "dummyQ"); @@ -212,7 +219,7 @@ public abstract class MessageStoreTestCase extends QpidTestCase { long messageId = 1; int contentSize = 0; - final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(messageId, contentSize)); + final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(messageId, contentSize)).allContentAdded(); EnqueueableMessage enqueueableMessage = createMockEnqueueableMessage(messageId, message); @@ -224,9 +231,8 @@ public abstract class MessageStoreTestCase extends QpidTestCase transaction.commitTran(); MessageInstanceHandler handler = mock(MessageInstanceHandler.class); - _store.visitMessageInstances(handler); - - verify(handler, times(1)).handle(queueId, messageId); + _storeReader.visitMessageInstances(handler); + verify(handler, times(1)).handle(argThat(new EnqueueRecordMatcher(queueId, messageId))); } public void testVisitDistributedTransactions() throws Exception @@ -234,19 +240,22 @@ public abstract class MessageStoreTestCase extends QpidTestCase long format = 1l; byte[] branchId = new byte[] { 2 }; byte[] globalId = new byte[] { 1 }; - Record enqueueRecord = getTestRecord(1); - Record dequeueRecord = getTestRecord(2); - Record[] enqueues = { enqueueRecord }; - Record[] dequeues = { dequeueRecord }; + EnqueueRecord enqueueRecord = getTestRecord(1); + TestRecord dequeueRecord = getTestRecord(2); + EnqueueRecord[] enqueues = { enqueueRecord }; + TestRecord[] dequeues = { dequeueRecord }; Transaction transaction = _store.newTransaction(); - transaction.recordXid(format, globalId, branchId, enqueues, dequeues); + final Transaction.StoredXidRecord record = + transaction.recordXid(format, globalId, branchId, enqueues, dequeues); transaction.commitTran(); DistributedTransactionHandler handler = mock(DistributedTransactionHandler.class); - _store.visitDistributedTransactions(handler); + _storeReader.visitDistributedTransactions(handler); - verify(handler, times(1)).handle(format,globalId, branchId, enqueues, dequeues); + verify(handler, times(1)).handle(eq(record), + argThat(new RecordMatcher(enqueues)), + argThat(new DequeueRecordMatcher(dequeues))); } @@ -267,7 +276,7 @@ public abstract class MessageStoreTestCase extends QpidTestCase txn.commitTran(); QueueFilteringMessageInstanceHandler filter = new QueueFilteringMessageInstanceHandler(mockQueueId); - getStore().visitMessageInstances(filter); + _storeReader.visitMessageInstances(filter); Set<Long> enqueuedIds = filter.getEnqueuedIds(); assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size()); @@ -298,7 +307,7 @@ public abstract class MessageStoreTestCase extends QpidTestCase txn.commitTran(); QueueFilteringMessageInstanceHandler filter = new QueueFilteringMessageInstanceHandler(mockQueueId); - getStore().visitMessageInstances(filter); + _storeReader.visitMessageInstances(filter); Set<Long> enqueuedIds = filter.getEnqueuedIds(); assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size()); @@ -333,7 +342,7 @@ public abstract class MessageStoreTestCase extends QpidTestCase txn.commitTran(); QueueFilteringMessageInstanceHandler filter = new QueueFilteringMessageInstanceHandler(mockQueueId); - getStore().visitMessageInstances(filter); + _storeReader.visitMessageInstances(filter); Set<Long> enqueuedIds = filter.getEnqueuedIds(); assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size()); @@ -345,10 +354,10 @@ public abstract class MessageStoreTestCase extends QpidTestCase { long messageId = 1; int contentSize = 0; - final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(messageId, contentSize, false)); + final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(messageId, contentSize, false)).allContentAdded(); MessageHandler handler = mock(MessageHandler.class); - _store.visitMessages(handler); + _storeReader.visitMessages(handler); verify(handler, times(0)).handle(argThat(new MessageMetaDataMatcher(messageId))); } @@ -357,11 +366,11 @@ public abstract class MessageStoreTestCase extends QpidTestCase { long messageId = 1; int contentSize = 0; - final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(messageId, contentSize)); + final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(messageId, contentSize)).allContentAdded(); enqueueMessage(message, "dummyQ"); final AtomicReference<StoredMessage<?>> retrievedMessageRef = new AtomicReference<StoredMessage<?>>(); - _store.visitMessages(new MessageHandler() + _storeReader.visitMessages(new MessageHandler() { @Override @@ -379,7 +388,7 @@ public abstract class MessageStoreTestCase extends QpidTestCase retrievedMessage.remove(); retrievedMessageRef.set(null); - _store.visitMessages(new MessageHandler() + _storeReader.visitMessages(new MessageHandler() { @Override @@ -411,7 +420,7 @@ public abstract class MessageStoreTestCase extends QpidTestCase return enqueueableMessage; } - private Record getTestRecord(long messageNumber) + private TestRecord getTestRecord(long messageNumber) { UUID queueId1 = UUIDGenerator.generateRandomUUID(); TransactionLogResource queue1 = mock(TransactionLogResource.class); @@ -422,13 +431,13 @@ public abstract class MessageStoreTestCase extends QpidTestCase final StoredMessage<?> storedMessage = mock(StoredMessage.class); when(storedMessage.getMessageNumber()).thenReturn(messageNumber); when(message1.getStoredMessage()).thenReturn(storedMessage); - Record enqueueRecord = new TestRecord(queue1, message1); + TestRecord enqueueRecord = new TestRecord(queue1, message1); return enqueueRecord; } private EnqueueableMessage createEnqueueableMessage(long messageId1) { - final StoredMessage<TestMessageMetaData> message1 = _store.addMessage(new TestMessageMetaData(messageId1, 0)); + final StoredMessage<TestMessageMetaData> message1 = _store.addMessage(new TestMessageMetaData(messageId1, 0)).allContentAdded(); EnqueueableMessage enqueueableMessage1 = createMockEnqueueableMessage(messageId1, message1); return enqueueableMessage1; } @@ -468,9 +477,10 @@ public abstract class MessageStoreTestCase extends QpidTestCase } @Override - public boolean handle(UUID queueId, long messageId) + public boolean handle(final MessageEnqueueRecord record) { - if (queueId.equals(_queueId)) + long messageId = record.getMessageNumber(); + if (record.getQueueId().equals(_queueId)) { if (_enqueuedIds.contains(messageId)) { @@ -487,4 +497,103 @@ public abstract class MessageStoreTestCase extends QpidTestCase } } + private class EnqueueRecordMatcher extends ArgumentMatcher<MessageEnqueueRecord> + { + private final UUID _queueId; + private final long _messageId; + + public EnqueueRecordMatcher(final UUID queueId, final long messageId) + { + _queueId = queueId; + _messageId = messageId; + } + + @Override + public boolean matches(final Object argument) + { + if(argument instanceof MessageEnqueueRecord) + { + MessageEnqueueRecord record = (MessageEnqueueRecord)argument; + return record.getQueueId().equals(_queueId) && record.getMessageNumber() == _messageId; + } + return false; + } + } + + + private class RecordMatcher extends ArgumentMatcher<Transaction.EnqueueRecord[]> + { + + private final EnqueueRecord[] _expect; + + public RecordMatcher(Transaction.EnqueueRecord[] expect) + { + _expect = expect; + } + + @Override + public boolean matches(final Object argument) + { + if(argument.getClass().isArray() && Transaction.EnqueueRecord.class.isAssignableFrom(argument.getClass().getComponentType())) + { + Transaction.EnqueueRecord[] actual = (Transaction.EnqueueRecord[]) argument; + if(actual.length == _expect.length) + { + for(int i = 0; i < actual.length; i++) + { + if(!actual[i].getResource().getId().equals(_expect[i].getResource().getId()) + || actual[i].getMessage().getMessageNumber() != _expect[i].getMessage().getMessageNumber()) + { + return false; + } + } + return true; + } + else + { + return false; + } + + } + return false; + } + } + + private class DequeueRecordMatcher extends ArgumentMatcher<Transaction.DequeueRecord[]> + { + + private final Transaction.DequeueRecord[] _expect; + + public DequeueRecordMatcher(Transaction.DequeueRecord[] expect) + { + _expect = expect; + } + + @Override + public boolean matches(final Object argument) + { + if(argument.getClass().isArray() && Transaction.DequeueRecord.class.isAssignableFrom(argument.getClass().getComponentType())) + { + Transaction.DequeueRecord[] actual = (Transaction.DequeueRecord[]) argument; + if(actual.length == _expect.length) + { + for(int i = 0; i < actual.length; i++) + { + if(!actual[i].getEnqueueRecord().getQueueId().equals(_expect[i].getEnqueueRecord().getQueueId()) + || actual[i].getEnqueueRecord().getMessageNumber() != _expect[i].getEnqueueRecord().getMessageNumber()) + { + return false; + } + } + return true; + } + else + { + return false; + } + + } + return false; + } + } } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java index 12b21fa964..eb12b2e320 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java @@ -35,7 +35,7 @@ public class TestMemoryMessageStore extends MemoryMessageStore public int getMessageCount() { final AtomicInteger counter = new AtomicInteger(); - visitMessages(new MessageHandler() + newMessageStoreReader().visitMessages(new MessageHandler() { @Override public boolean handle(StoredMessage<?> storedMessage) diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestRecord.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestRecord.java index 668d9d5242..0ede362c0d 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestRecord.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestRecord.java @@ -20,10 +20,12 @@ */ package org.apache.qpid.server.store; +import java.util.UUID; + import org.apache.qpid.server.message.EnqueueableMessage; -import org.apache.qpid.server.store.Transaction.Record; +import org.apache.qpid.server.store.Transaction.EnqueueRecord; -public class TestRecord implements Record +public class TestRecord implements EnqueueRecord, Transaction.DequeueRecord, MessageEnqueueRecord { private TransactionLogResource _queue; private EnqueueableMessage _message; @@ -68,11 +70,11 @@ public class TestRecord implements Record { return false; } - if (!(obj instanceof Record)) + if (!(obj instanceof EnqueueRecord)) { return false; } - Record other = (Record) obj; + EnqueueRecord other = (EnqueueRecord) obj; if (_message == null && other.getMessage() != null) { return false; @@ -88,4 +90,21 @@ public class TestRecord implements Record return _queue.getId().equals(other.getResource().getId()); } -}
\ No newline at end of file + @Override + public MessageEnqueueRecord getEnqueueRecord() + { + return this; + } + + @Override + public UUID getQueueId() + { + return _queue.getId(); + } + + @Override + public long getMessageNumber() + { + return _message.getMessageNumber(); + } +} diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java index a61ac4f5d2..e13a972cf9 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java @@ -26,6 +26,7 @@ import java.util.Collections; import org.apache.qpid.server.message.EnqueueableMessage; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.store.MessageDurability; +import org.apache.qpid.server.store.MessageEnqueueRecord; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.util.FutureResult; import org.apache.qpid.server.store.Transaction; @@ -42,7 +43,7 @@ public class AsyncAutoCommitTransactionTest extends QpidTestCase private BaseQueue _queue = mock(BaseQueue.class); private MessageStore _messageStore = mock(MessageStore.class); private Transaction _storeTransaction = mock(Transaction.class); - private Action _postTransactionAction = mock(Action.class); + private ServerTransaction.EnqueueAction _postTransactionAction = mock(ServerTransaction.EnqueueAction.class); private FutureResult _future = mock(FutureResult.class); @@ -70,7 +71,7 @@ public class AsyncAutoCommitTransactionTest extends QpidTestCase asyncAutoCommitTransaction.enqueue(_queue, _message, _postTransactionAction); verify(_storeTransaction).enqueueMessage(_queue, _message); - verify(_futureRecorder).recordFuture(_future, _postTransactionAction); + verify(_futureRecorder).recordFuture(eq(_future), any(Action.class)); verifyZeroInteractions(_postTransactionAction); } @@ -87,7 +88,7 @@ public class AsyncAutoCommitTransactionTest extends QpidTestCase asyncAutoCommitTransaction.enqueue(Collections.singletonList(_queue), _message, _postTransactionAction); verify(_storeTransaction).enqueueMessage(_queue, _message); - verify(_futureRecorder).recordFuture(_future, _postTransactionAction); + verify(_futureRecorder).recordFuture(eq(_future), any(Action.class)); verifyZeroInteractions(_postTransactionAction); } @@ -104,7 +105,7 @@ public class AsyncAutoCommitTransactionTest extends QpidTestCase asyncAutoCommitTransaction.enqueue(_queue, _message, _postTransactionAction); verify(_storeTransaction).enqueueMessage(_queue, _message); - verify(_futureRecorder).recordFuture(_future, _postTransactionAction); + verify(_futureRecorder).recordFuture(eq(_future), any(Action.class)); verifyZeroInteractions(_postTransactionAction); } @@ -120,7 +121,7 @@ public class AsyncAutoCommitTransactionTest extends QpidTestCase asyncAutoCommitTransaction.enqueue(_queue, _message, _postTransactionAction); verifyZeroInteractions(_storeTransaction); - verify(_postTransactionAction).postCommit(); + verify(_postTransactionAction).postCommit((MessageEnqueueRecord)null); verifyZeroInteractions(_futureRecorder); } @@ -136,7 +137,7 @@ public class AsyncAutoCommitTransactionTest extends QpidTestCase asyncAutoCommitTransaction.enqueue(_queue, _message, _postTransactionAction); verifyZeroInteractions(_storeTransaction); - verify(_futureRecorder).recordFuture(FutureResult.IMMEDIATE_FUTURE, _postTransactionAction); + verify(_futureRecorder).recordFuture(eq(FutureResult.IMMEDIATE_FUTURE), any(Action.class)); verifyZeroInteractions(_postTransactionAction); } } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java index 5abbd7352b..49ececb00f 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java @@ -32,6 +32,7 @@ import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.queue.MockMessageInstance; import org.apache.qpid.server.store.MessageDurability; +import org.apache.qpid.server.store.MessageEnqueueRecord; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.txn.MockStoreTransaction.TransactionState; @@ -225,7 +226,7 @@ public class AutoCommitTransactionTest extends QpidTestCase _message = createTestMessage(false); _queue = createTestAMQQueue(false); - _transaction.dequeue(_queue, _message, _action); + _transaction.dequeue((MessageEnqueueRecord)null, _action); assertEquals("Dequeue of non-persistent message must not cause message to be dequeued", 0, _storeTransaction.getNumberOfDequeuedMessages()); assertEquals("Unexpected transaction state", TransactionState.NOT_STARTED, _storeTransaction.getState()); @@ -234,23 +235,6 @@ public class AutoCommitTransactionTest extends QpidTestCase } - /** - * Tests the dequeue of a persistent message from a single non durable queue. - * Asserts that a store transaction has not been started and post commit - * action fired. - */ - public void testDequeueFromDurableQueueOfPersistentMessage() throws Exception - { - _message = createTestMessage(true); - _queue = createTestAMQQueue(true); - - _transaction.dequeue(_queue, _message, _action); - - assertEquals("Dequeue of persistent message to durable queue must cause message to be dequeued",1, _storeTransaction.getNumberOfDequeuedMessages()); - assertEquals("Unexpected transaction state", TransactionState.COMMITTED, _storeTransaction.getState()); - assertFalse("Rollback action must not be fired", _action.isRollbackActionFired()); - assertTrue("Post commit action must be fired", _action.isPostCommitActionFired()); - } /** * Tests the case where the store operation throws an exception. @@ -268,7 +252,7 @@ public class AutoCommitTransactionTest extends QpidTestCase try { - _transaction.dequeue(_queue, _message, _action); + _transaction.dequeue(mock(MessageEnqueueRecord.class), _action); fail("Exception not thrown"); } catch (RuntimeException re) @@ -387,7 +371,7 @@ public class AutoCommitTransactionTest extends QpidTestCase { final BaseQueue queue = createTestAMQQueue(queueDurableFlags[i]); final ServerMessage message = createTestMessage(messagePersistentFlags[i]); - + final boolean hasRecord = queueDurableFlags[i] && messagePersistentFlags[i]; queueEntries.add(new MockMessageInstance() { @@ -402,7 +386,19 @@ public class AutoCommitTransactionTest extends QpidTestCase { return queue; } - + + @Override + public MessageEnqueueRecord getEnqueueRecord() + { + if(hasRecord) + { + return mock(MessageEnqueueRecord.class); + } + else + { + return null; + } + } }); } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java index c905e52715..00ea9d6b01 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java @@ -32,6 +32,7 @@ import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.queue.MockMessageInstance; import org.apache.qpid.server.store.MessageDurability; +import org.apache.qpid.server.store.MessageEnqueueRecord; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.txn.MockStoreTransaction.TransactionState; @@ -222,7 +223,7 @@ public class LocalTransactionTest extends QpidTestCase _message = createTestMessage(false); _queue = createQueue(false); - _transaction.dequeue(_queue, _message, _action1); + _transaction.dequeue((MessageEnqueueRecord)null, _action1); assertEquals("Dequeue of non-persistent message must not cause message to be enqueued", 0, _storeTransaction.getNumberOfEnqueuedMessages()); assertEquals("Unexpected transaction state", TransactionState.NOT_STARTED, _storeTransaction.getState()); @@ -239,7 +240,7 @@ public class LocalTransactionTest extends QpidTestCase _message = createTestMessage(true); _queue = createQueue(true); - _transaction.dequeue(_queue, _message, _action1); + _transaction.dequeue(mock(MessageEnqueueRecord.class), _action1); assertEquals("Dequeue of non-persistent message must cause message to be dequeued", 1, _storeTransaction.getNumberOfDequeuedMessages()); assertEquals("Unexpected transaction state", TransactionState.STARTED, _storeTransaction.getState()); @@ -261,7 +262,7 @@ public class LocalTransactionTest extends QpidTestCase try { - _transaction.dequeue(_queue, _message, _action1); + _transaction.dequeue(mock(MessageEnqueueRecord.class), _action1); fail("Exception not thrown"); } catch (RuntimeException re) @@ -404,7 +405,7 @@ public class LocalTransactionTest extends QpidTestCase assertEquals("Unexpected transaction state", TransactionState.NOT_STARTED, _storeTransaction.getState()); assertFalse("Post commit action must not be fired yet", _action1.isPostCommitActionFired()); - _transaction.dequeue(_queue, _message, _action1); + _transaction.dequeue(mock(MessageEnqueueRecord.class), _action1); assertEquals("Unexpected transaction state", TransactionState.STARTED, _storeTransaction.getState()); assertFalse("Post commit action must not be fired yet", _action1.isPostCommitActionFired()); @@ -428,7 +429,7 @@ public class LocalTransactionTest extends QpidTestCase assertEquals("Unexpected transaction state", TransactionState.NOT_STARTED, _storeTransaction.getState()); assertFalse("Rollback action must not be fired yet", _action1.isRollbackActionFired()); - _transaction.dequeue(_queue, _message, _action1); + _transaction.dequeue(mock(MessageEnqueueRecord.class), _action1); assertEquals("Unexpected transaction state", TransactionState.STARTED, _storeTransaction.getState()); assertFalse("Rollback action must not be fired yet", _action1.isRollbackActionFired()); @@ -451,7 +452,7 @@ public class LocalTransactionTest extends QpidTestCase _queue = createQueue(true); _transaction.addPostTransactionAction(_action1); - _transaction.dequeue(_queue, _message, _action2); + _transaction.dequeue(mock(MessageEnqueueRecord.class), _action2); _transaction.commit(); assertEquals("Unexpected transaction state", TransactionState.COMMITTED, _storeTransaction.getState()); @@ -473,7 +474,7 @@ public class LocalTransactionTest extends QpidTestCase _queue = createQueue(true); _transaction.addPostTransactionAction(_action1); - _transaction.dequeue(_queue, _message, _action2); + _transaction.dequeue(mock(MessageEnqueueRecord.class), _action2); _transaction.rollback(); assertEquals("Unexpected transaction state", TransactionState.ABORTED, _storeTransaction.getState()); @@ -532,7 +533,7 @@ public class LocalTransactionTest extends QpidTestCase _queue = createQueue(true); long startTime = System.currentTimeMillis(); - _transaction.dequeue(_queue, _message, _action1); + _transaction.dequeue(mock(MessageEnqueueRecord.class), _action1); assertTrue("Transaction start time should have been recorded", _transaction.getTransactionStartTime() >= startTime); assertEquals("Transaction update time should be the same as transaction start time", _transaction.getTransactionStartTime(), _transaction.getTransactionUpdateTime()); @@ -552,7 +553,7 @@ public class LocalTransactionTest extends QpidTestCase final long transactionUpdateTimeAfterFirstEnqueue = _transaction.getTransactionUpdateTime(); Thread.sleep(1); - _transaction.dequeue(_queue, _message, _action2); + _transaction.dequeue(mock(MessageEnqueueRecord.class), _action2); final long transactionStartTimeAfterFirstDequeue = _transaction.getTransactionStartTime(); final long transactionUpdateTimeAfterFirstDequeue = _transaction.getTransactionUpdateTime(); @@ -611,7 +612,7 @@ public class LocalTransactionTest extends QpidTestCase { final TransactionLogResource queue = createQueue(queueDurableFlags[i]); final ServerMessage message = createTestMessage(messagePersistentFlags[i]); - + final boolean hasRecord = queueDurableFlags[i] && messagePersistentFlags[i]; queueEntries.add(new MockMessageInstance() { @@ -626,7 +627,12 @@ public class LocalTransactionTest extends QpidTestCase { return queue; } - + + @Override + public MessageEnqueueRecord getEnqueueRecord() + { + return hasRecord ? mock(MessageEnqueueRecord.class) : null; + } }); } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockAction.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockAction.java index 8fba28e200..9be896f0c4 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockAction.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockAction.java @@ -20,18 +20,24 @@ */ package org.apache.qpid.server.txn; -import org.apache.qpid.server.txn.ServerTransaction.Action; +import org.apache.qpid.server.store.MessageEnqueueRecord; /** * Mock implementation of a ServerTransaction Action * allowing its state to be observed. * */ -class MockAction implements Action +class MockAction implements ServerTransaction.EnqueueAction, ServerTransaction.Action { private boolean _rollbackFired = false; private boolean _postCommitFired = false; + public void postCommit(MessageEnqueueRecord... records) + { + _postCommitFired = true; + } + + @Override public void postCommit() { _postCommitFired = true; @@ -51,4 +57,4 @@ class MockAction implements Action { return _postCommitFired; } -}
\ No newline at end of file +} diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java index 6fcfde0221..8aac7861ae 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java @@ -20,8 +20,11 @@ */ package org.apache.qpid.server.txn; +import java.util.UUID; + import org.apache.commons.lang.NotImplementedException; import org.apache.qpid.server.message.EnqueueableMessage; +import org.apache.qpid.server.store.MessageEnqueueRecord; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.NullMessageStore; import org.apache.qpid.server.util.FutureResult; @@ -60,7 +63,7 @@ class MockStoreTransaction implements Transaction return _state; } - public void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message) + public MessageEnqueueRecord enqueueMessage(TransactionLogResource queue, EnqueueableMessage message) { if (_throwExceptionOnQueueOp) { @@ -69,6 +72,7 @@ class MockStoreTransaction implements Transaction } _numberOfEnqueuedMessages++; + return new MockEnqueueRecord(queue.getId(), message.getMessageNumber()); } public int getNumberOfDequeuedMessages() @@ -81,7 +85,8 @@ class MockStoreTransaction implements Transaction return _numberOfEnqueuedMessages; } - public void dequeueMessage(TransactionLogResource queue, EnqueueableMessage message) + @Override + public void dequeueMessage(final MessageEnqueueRecord enqueueRecord) { if (_throwExceptionOnQueueOp) { @@ -110,8 +115,19 @@ class MockStoreTransaction implements Transaction { } - public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues) + @Override + public void removeXid(final StoredXidRecord record) + { + + } + + public StoredXidRecord recordXid(long format, + byte[] globalId, + byte[] branchId, + EnqueueRecord[] enqueues, + DequeueRecord[] dequeues) { + return null; } public static MessageStore createTestTransactionLog(final MockStoreTransaction storeTransaction) @@ -126,4 +142,27 @@ class MockStoreTransaction implements Transaction } }; } + + private static class MockEnqueueRecord implements MessageEnqueueRecord + { + private final UUID _queueId; + private final long _messageNumber; + + public MockEnqueueRecord(final UUID queueId, + final long messageNumber) + { + _queueId = queueId; + _messageNumber = messageNumber; + } + + public UUID getQueueId() + { + return _queueId; + } + + public long getMessageNumber() + { + return _messageNumber; + } + } } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AbstractVirtualHostTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AbstractVirtualHostTest.java index 47fd5a7398..b4eddc5033 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AbstractVirtualHostTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AbstractVirtualHostTest.java @@ -161,6 +161,8 @@ public class AbstractVirtualHostTest extends QpidTestCase { Map<String,Object> attributes = Collections.<String, Object>singletonMap(AbstractVirtualHost.NAME, getTestName()); final MessageStore store = mock(MessageStore.class); + when(store.newMessageStoreReader()).thenReturn(mock(MessageStore.MessageStoreReader.class)); + AbstractVirtualHost host = new AbstractVirtualHost(attributes, _node) { @Override @@ -200,7 +202,8 @@ public class AbstractVirtualHostTest extends QpidTestCase public void testActivateInErrorStateAfterOpen() throws Exception { - Map<String,Object> attributes = Collections.<String, Object>singletonMap(AbstractVirtualHost.NAME, getTestName()); + Map<String,Object> attributes = Collections.<String, Object>singletonMap(AbstractVirtualHost.NAME, + getTestName()); final MessageStore store = mock(MessageStore.class); doThrow(new RuntimeException("Cannot open store")).when(store).openMessageStore(any(ConfiguredObject.class)); AbstractVirtualHost host = new AbstractVirtualHost(attributes, _node) @@ -216,6 +219,7 @@ public class AbstractVirtualHostTest extends QpidTestCase assertEquals("Unexpected state", State.ERRORED, host.getState()); doNothing().when(store).openMessageStore(any(ConfiguredObject.class)); + when(store.newMessageStoreReader()).thenReturn(mock(MessageStore.MessageStoreReader.class)); host.setAttributes(Collections.<String, Object>singletonMap(VirtualHost.DESIRED_STATE, State.ACTIVE)); assertEquals("Unexpected state", State.ACTIVE, host.getState()); @@ -239,6 +243,7 @@ public class AbstractVirtualHostTest extends QpidTestCase assertEquals("Unexpected state", State.ERRORED, host.getState()); doNothing().when(store).openMessageStore(any(ConfiguredObject.class)); + when(store.newMessageStoreReader()).thenReturn(mock(MessageStore.MessageStoreReader.class)); host.start(); assertEquals("Unexpected state", State.ACTIVE, host.getState()); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java index 9c05ed564a..00089ea130 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java @@ -24,7 +24,6 @@ import static org.mockito.Matchers.any; import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.isNull; -import static org.mockito.Matchers.same; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -43,6 +42,7 @@ import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.store.MessageDurability; +import org.apache.qpid.server.store.MessageEnqueueRecord; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.NullMessageStore; import org.apache.qpid.server.store.StorableMessageMetaData; @@ -50,7 +50,7 @@ import org.apache.qpid.server.store.StoreException; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TestMessageMetaData; import org.apache.qpid.server.store.Transaction; -import org.apache.qpid.server.store.Transaction.Record; +import org.apache.qpid.server.store.Transaction.EnqueueRecord; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.store.handler.DistributedTransactionHandler; import org.apache.qpid.server.store.handler.MessageHandler; @@ -93,7 +93,7 @@ public class SynchronousMessageStoreRecovererTest extends TestCase @Override public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException { - handler.handle(queue.getId(), messageId); + handler.handle(new TestMessageEnqueueRecord(queue.getId(), messageId)); } }; @@ -104,7 +104,7 @@ public class SynchronousMessageStoreRecovererTest extends TestCase recoverer.recover(_virtualHost); ServerMessage<?> message = storedMessage.getMetaData().getType().createMessage(storedMessage); - verify(queue, times(1)).recover(eq(message)); + verify(queue, times(1)).recover(eq(message), any(MessageEnqueueRecord.class)); } @SuppressWarnings("unchecked") @@ -126,7 +126,7 @@ public class SynchronousMessageStoreRecovererTest extends TestCase @Override public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException { - handler.handle(queue.getId(), messageId); + handler.handle(new TestMessageEnqueueRecord(queue.getId(), messageId)); } @Override @@ -142,8 +142,8 @@ public class SynchronousMessageStoreRecovererTest extends TestCase recoverer = new SynchronousMessageStoreRecoverer(); recoverer.recover(_virtualHost); - verify(queue, never()).enqueue(any(ServerMessage.class), any(Action.class)); - verify(transaction).dequeueMessage(same(queue), argThat(new MessageNumberMatcher(messageId))); + verify(queue, never()).enqueue(any(ServerMessage.class), any(Action.class), any(MessageEnqueueRecord.class)); + verify(transaction).dequeueMessage(argThat(new MessageEnqueueRecordMatcher(queue.getId(), messageId))); verify(transaction, times(1)).commitTranAsync(); } @@ -165,7 +165,7 @@ public class SynchronousMessageStoreRecovererTest extends TestCase @Override public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException { - handler.handle(queueId, messageId); + handler.handle(new TestMessageEnqueueRecord(queueId, messageId)); } @Override @@ -181,7 +181,7 @@ public class SynchronousMessageStoreRecovererTest extends TestCase recoverer = new SynchronousMessageStoreRecoverer(); recoverer.recover(_virtualHost); - verify(transaction).dequeueMessage(argThat(new QueueIdMatcher(queueId)), argThat(new MessageNumberMatcher(messageId))); + verify(transaction).dequeueMessage(argThat(new MessageEnqueueRecordMatcher(queueId,messageId))); verify(transaction, times(1)).commitTranAsync(); } @@ -226,13 +226,13 @@ public class SynchronousMessageStoreRecovererTest extends TestCase long messageId = storedMessage.getMessageNumber(); EnqueueableMessage enqueueableMessage = createMockEnqueueableMessage(messageId, storedMessage); - Record enqueueRecord = createMockRecord(queue, enqueueableMessage); + EnqueueRecord enqueueRecord = createMockRecord(queue, enqueueableMessage); final long format = 1; final byte[] globalId = new byte[] {0}; final byte[] branchId = new byte[] {0}; - final Record[] enqueues = { enqueueRecord }; - final Record[] dequeues = {}; + final EnqueueRecord[] enqueues = { enqueueRecord }; + final Transaction.DequeueRecord[] dequeues = {}; MessageStore store = new NullMessageStore() { @@ -251,7 +251,26 @@ public class SynchronousMessageStoreRecovererTest extends TestCase @Override public void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException { - handler.handle(format, globalId, branchId, enqueues, dequeues); + handler.handle(new Transaction.StoredXidRecord() + { + @Override + public long getFormat() + { + return format; + } + + @Override + public byte[] getGlobalId() + { + return globalId; + } + + @Override + public byte[] getBranchId() + { + return branchId; + } + }, enqueues, dequeues); } @Override @@ -275,22 +294,21 @@ public class SynchronousMessageStoreRecovererTest extends TestCase branch.commit(); ServerMessage<?> message = storedMessage.getMetaData().getType().createMessage(storedMessage); - verify(queue, times(1)).enqueue(eq(message), (Action<? super MessageInstance>)isNull()); + verify(queue, times(1)).enqueue(eq(message), (Action<? super MessageInstance>)isNull(), any(MessageEnqueueRecord.class)); verify(transaction).commitTran(); } public void testRecoveryOfSingleDequeueWithDistributedTransaction() { - final AMQQueue<?> queue = createRegisteredMockQueue(); + final UUID queueId = UUID.randomUUID(); + final AMQQueue<?> queue = createRegisteredMockQueue(queueId); final Transaction transaction = mock(Transaction.class); - final StoredMessage<StorableMessageMetaData> storedMessage = createMockStoredMessage(1); final long messageId = storedMessage.getMessageNumber(); - EnqueueableMessage enqueueableMessage = createMockEnqueueableMessage(messageId, storedMessage); - Record dequeueRecord = createMockRecord(queue, enqueueableMessage); + Transaction.DequeueRecord dequeueRecord = createMockDequeueRecord(queueId, messageId); QueueEntry queueEntry = mock(QueueEntry.class); when(queue.getMessageOnTheQueue(messageId)).thenReturn(queueEntry); @@ -298,8 +316,8 @@ public class SynchronousMessageStoreRecovererTest extends TestCase final long format = 1; final byte[] globalId = new byte[] {0}; final byte[] branchId = new byte[] {0}; - final Record[] enqueues = {}; - final Record[] dequeues = { dequeueRecord }; + final EnqueueRecord[] enqueues = {}; + final Transaction.DequeueRecord[] dequeues = { dequeueRecord }; MessageStore store = new NullMessageStore() { @@ -314,13 +332,32 @@ public class SynchronousMessageStoreRecovererTest extends TestCase { // We need the message to be enqueued onto the queue so that later the distributed transaction // can dequeue it. - handler.handle(queue.getId(), messageId); + handler.handle(new TestMessageEnqueueRecord(queue.getId(), messageId)); } @Override public void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException { - handler.handle(format, globalId, branchId, enqueues, dequeues); + handler.handle(new Transaction.StoredXidRecord() + { + @Override + public long getFormat() + { + return format; + } + + @Override + public byte[] getGlobalId() + { + return globalId; + } + + @Override + public byte[] getBranchId() + { + return branchId; + } + }, enqueues, dequeues); } @Override @@ -348,14 +385,25 @@ public class SynchronousMessageStoreRecovererTest extends TestCase } - protected Record createMockRecord(AMQQueue<?> queue, EnqueueableMessage enqueueableMessage) + protected EnqueueRecord createMockRecord(AMQQueue<?> queue, EnqueueableMessage enqueueableMessage) { - Record enqueueRecord = mock(Record.class); + EnqueueRecord enqueueRecord = mock(EnqueueRecord.class); when(enqueueRecord.getMessage()).thenReturn(enqueueableMessage); when(enqueueRecord.getResource()).thenReturn(queue); return enqueueRecord; } + + protected Transaction.DequeueRecord createMockDequeueRecord(UUID queueId, long messageNumber) + { + Transaction.DequeueRecord dequeueRecord = mock(Transaction.DequeueRecord.class); + MessageEnqueueRecord enqueueRecord = mock(MessageEnqueueRecord.class); + when(enqueueRecord.getMessageNumber()).thenReturn(messageNumber); + when(enqueueRecord.getQueueId()).thenReturn(queueId); + when(dequeueRecord.getEnqueueRecord()).thenReturn(enqueueRecord); + return dequeueRecord; + } + protected EnqueueableMessage createMockEnqueueableMessage(long messageId, final StoredMessage<StorableMessageMetaData> storedMessage) { @@ -378,8 +426,12 @@ public class SynchronousMessageStoreRecovererTest extends TestCase private AMQQueue<?> createRegisteredMockQueue() { + return createRegisteredMockQueue(UUID.randomUUID()); + } + + private AMQQueue<?> createRegisteredMockQueue(UUID queueId) + { AMQQueue<?> queue = mock(AMQQueue.class); - final UUID queueId = UUID.randomUUID(); when(queue.getMessageDurability()).thenReturn(MessageDurability.DEFAULT); when(queue.getId()).thenReturn(queueId); when(queue.getName()).thenReturn("test-queue"); @@ -404,19 +456,47 @@ public class SynchronousMessageStoreRecovererTest extends TestCase } } - private final class MessageNumberMatcher extends ArgumentMatcher<EnqueueableMessage> + private final class MessageEnqueueRecordMatcher extends ArgumentMatcher<MessageEnqueueRecord> { private final long _messageId; + private final UUID _queueId; - private MessageNumberMatcher(long messageId) + private MessageEnqueueRecordMatcher(UUID queueId, long messageId) { _messageId = messageId; + _queueId = queueId; } @Override public boolean matches(Object argument) { - return argument instanceof EnqueueableMessage && ((EnqueueableMessage)argument).getMessageNumber() == _messageId; + return argument instanceof MessageEnqueueRecord + && ((MessageEnqueueRecord)argument).getMessageNumber() == _messageId + && ((MessageEnqueueRecord)argument).getQueueId().equals(_queueId); + } + } + + private class TestMessageEnqueueRecord implements MessageEnqueueRecord + { + private final UUID _queueId; + private final long _messageId; + + public TestMessageEnqueueRecord(final UUID queueId, final long messageId) + { + _queueId = queueId; + _messageId = messageId; + } + + @Override + public UUID getQueueId() + { + return _queueId; + } + + @Override + public long getMessageNumber() + { + return _messageId; } } } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java index 5affe3019c..db053eef26 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java @@ -377,7 +377,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC private void forceDequeue(final MessageInstance entry, final boolean restoreCredit) { AutoCommitTransaction dequeueTxn = new AutoCommitTransaction(_session.getVirtualHost().getMessageStore()); - dequeueTxn.dequeue(entry.getOwningResource(), entry.getMessage(), + dequeueTxn.dequeue(entry.getEnqueueRecord(), new ServerTransaction.Action() { public void postCommit() diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java index 69abcd7727..bd04db11ae 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java @@ -76,12 +76,6 @@ public class MessageConverter_Internal_to_v0_10 implements MessageConverter<Inte } @Override - public void addContent(int offsetInMessage, ByteBuffer src) - { - throw new UnsupportedOperationException(); - } - - @Override public int getContent(int offsetInMessage, ByteBuffer dst) { int size = messageContent.length - offsetInMessage; diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java index 209eae9ad1..bd99458d51 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java @@ -83,12 +83,6 @@ public class MessageConverter_v0_10 implements MessageConverter<ServerMessage, M } @Override - public void addContent(int offsetInMessage, ByteBuffer src) - { - throw new UnsupportedOperationException(); - } - - @Override public int getContent(int offsetInMessage, ByteBuffer dst) { return serverMsg.getContent(dst, offsetInMessage); diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java index 67204427fb..12f79cf203 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java @@ -514,7 +514,7 @@ public class ServerSession extends Session public void acknowledge(final ConsumerTarget_0_10 sub, final MessageInstance entry) { - _transaction.dequeue(entry.getOwningResource(), entry.getMessage(), + _transaction.dequeue(entry.getEnqueueRecord(), new ServerTransaction.Action() { diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java index 75a67c6c2a..694dda4da2 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java @@ -40,6 +40,7 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.protocol.ServerProtocolEngine; import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.exchange.ExchangeImpl; +import org.apache.qpid.server.store.MessageHandle; import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException; import org.apache.qpid.server.filter.AMQInvalidArgumentException; import org.apache.qpid.server.filter.ArrivalTimeFilter; @@ -493,13 +494,14 @@ public class ServerSessionDelegate extends SessionDelegate private StoredMessage<MessageMetaData_0_10> createStoreMessage(final MessageTransfer xfr, final MessageMetaData_0_10 messageMetaData, final MessageStore store) { - final StoredMessage<MessageMetaData_0_10> storeMessage = store.addMessage(messageMetaData); + final MessageHandle<MessageMetaData_0_10> addedMessage = store.addMessage(messageMetaData); ByteBuffer body = xfr.getBody(); if(body != null) { - storeMessage.addContent(0, body); + addedMessage.addContent(body); } - return storeMessage; + final StoredMessage<MessageMetaData_0_10> storedMessage = addedMessage.allContentAdded(); + return storedMessage; } @Override diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index 16ea23b765..522408910d 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -96,6 +96,7 @@ import org.apache.qpid.server.protocol.ConsumerListener; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueArgumentsConverter; import org.apache.qpid.server.security.SecurityManager; +import org.apache.qpid.server.store.MessageHandle; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TransactionLogResource; @@ -429,22 +430,24 @@ public class AMQChannel contentHeader, getConnection().getLastReceivedTime()); - final StoredMessage<MessageMetaData> handle = _messageStore.addMessage(messageMetaData); - final AMQMessage amqMessage = createAMQMessage(handle); - MessageReference reference = amqMessage.newReference(); - try + final MessageHandle<MessageMetaData> handle = _messageStore.addMessage(messageMetaData); + int bodyCount = _currentMessage.getBodyCount(); + if(bodyCount > 0) { - int bodyCount = _currentMessage.getBodyCount(); - if(bodyCount > 0) + long bodyLengthReceived = 0; + for(int i = 0 ; i < bodyCount ; i++) { - long bodyLengthReceived = 0; - for(int i = 0 ; i < bodyCount ; i++) - { - ContentBody contentChunk = _currentMessage.getContentChunk(i); - handle.addContent((int)bodyLengthReceived, ByteBuffer.wrap(contentChunk.getPayload())); - bodyLengthReceived += contentChunk.getSize(); - } + ContentBody contentChunk = _currentMessage.getContentChunk(i); + handle.addContent(ByteBuffer.wrap(contentChunk.getPayload())); + bodyLengthReceived += contentChunk.getSize(); } + } + final StoredMessage<MessageMetaData> storedMessage = handle.allContentAdded(); + + final AMQMessage amqMessage = createAMQMessage(storedMessage); + MessageReference reference = amqMessage.newReference(); + try + { _currentMessage = null; @@ -500,7 +503,7 @@ public class AMQChannel .createBasicAckBody(_confirmedMessageCounter, false); _connection.writeFrame(responseBody.generateFrame(_channelId)); } - incrementUncommittedMessageSize(handle); + incrementUncommittedMessageSize(storedMessage); incrementOutstandingTxnsIfNecessary(); } } @@ -1512,7 +1515,7 @@ public class AMQChannel try { entry.delete(); - txn.dequeue(queue, message, + txn.dequeue(entry.getEnqueueRecord(), new ServerTransaction.Action() { @Override diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java index 5f7d5fe46e..d46ed719b2 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java @@ -189,7 +189,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen // The send may of course still fail, in which case, as // the message is unacked, it will be lost. - _txn.dequeue(entry.getOwningResource(), entry.getMessage(), NOOP); + _txn.dequeue(entry.getEnqueueRecord(), NOOP); ServerMessage message = entry.getMessage(); MessageReference ref = message.newReference(); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java index 55746dfa95..ef42a1311f 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java @@ -88,12 +88,6 @@ public class MessageConverter_Internal_to_v0_8 implements MessageConverter<Inter } @Override - public void addContent(int offsetInMessage, ByteBuffer src) - { - throw new UnsupportedOperationException(); - } - - @Override public int getContent(int offsetInMessage, ByteBuffer dst) { int size = messageContent.length - offsetInMessage; diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java index 55fc865850..406566cd4c 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java @@ -33,6 +33,7 @@ import org.apache.qpid.framing.MessagePublishInfo; import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.store.MessageEnqueueRecord; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TestMemoryMessageStore; import org.apache.qpid.server.txn.AutoCommitTransaction; @@ -105,22 +106,21 @@ public class AckTest extends QpidTestCase final MessageMetaData mmd = new MessageMetaData(publishBody,cb, System.currentTimeMillis()); - final StoredMessage<MessageMetaData> result =_messageStore.addMessage(mmd); + final StoredMessage<MessageMetaData> result =_messageStore.addMessage(mmd).allContentAdded(); final StoredMessage storedMessage = result; final AMQMessage message = new AMQMessage(storedMessage); ServerTransaction txn = new AutoCommitTransaction(_messageStore); txn.enqueue(_queue, message, - new ServerTransaction.Action() + new ServerTransaction.EnqueueAction() { - public void postCommit() + public void postCommit(MessageEnqueueRecord... records) { - _queue.enqueue(message,null); + _queue.enqueue(message,null, null); } public void onRollback() { - //To change body of implemented methods use File | Settings | File Templates. } }); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java index dcdac07eae..7dd4734e6b 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java @@ -178,7 +178,7 @@ public class AcknowledgeTest extends QpidTestCase private void checkStoreContents(int messageCount) { MessageCounter counter = new MessageCounter(); - _messageStore.visitMessages(counter); + _messageStore.newMessageStoreReader().visitMessages(counter); assertEquals("Message header count incorrect in the MetaDataMap", messageCount, counter.getCount()); } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java index f21bf07fae..c6aea39aa6 100755 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java @@ -26,9 +26,10 @@ import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.MessagePublishInfo; +import org.apache.qpid.server.store.MessageHandle; import org.apache.qpid.server.store.StoredMessage; -public class MockStoredMessage implements StoredMessage<MessageMetaData> +public class MockStoredMessage implements StoredMessage<MessageMetaData>, MessageHandle<MessageMetaData> { private long _messageId; private MessageMetaData _metaData; @@ -72,12 +73,17 @@ public class MockStoredMessage implements StoredMessage<MessageMetaData> return _messageId; } - public void addContent(int offsetInMessage, ByteBuffer src) + public void addContent(ByteBuffer src) { src = src.duplicate(); - ByteBuffer dst = _content.duplicate(); - dst.position(offsetInMessage); - dst.put(src); + _content.put(src); + } + + @Override + public StoredMessage<MessageMetaData> allContentAdded() + { + _content.flip(); + return this; } public int getContent(int offset, ByteBuffer dst) diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java index 816485c42b..264350ff8d 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java @@ -135,7 +135,7 @@ public class QueueBrowserUsesNoAckTest extends QpidTestCase private void checkStoreContents(int messageCount) { MessageCounter counter = new MessageCounter(); - _messageStore.visitMessages(counter); + _messageStore.newMessageStoreReader().visitMessages(counter); assertEquals("Message header count incorrect in the MetaDataMap", messageCount, counter.getCount()); } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java index 6059f7f4e5..de3b68a0bc 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java @@ -60,7 +60,7 @@ public class ReferenceCountingTest extends QpidTestCase final MessageMetaData mmd = new MessageMetaData(info, chb); - StoredMessage storedMessage = _store.addMessage(mmd); + StoredMessage storedMessage = _store.addMessage(mmd).allContentAdded(); Transaction txn = _store.newTransaction(); txn.enqueueMessage(createTransactionLogResource("dummyQ"), createEnqueueableMessage(storedMessage)); txn.commitTran(); @@ -78,7 +78,7 @@ public class ReferenceCountingTest extends QpidTestCase private int getStoreMessageCount() { MessageCounter counter = new MessageCounter(); - _store.visitMessages(counter); + _store.newMessageStoreReader().visitMessages(counter); return counter.getCount(); } @@ -99,7 +99,7 @@ public class ReferenceCountingTest extends QpidTestCase final MessageMetaData mmd = new MessageMetaData(info, chb); - StoredMessage storedMessage = _store.addMessage(mmd); + StoredMessage storedMessage = _store.addMessage(mmd).allContentAdded(); Transaction txn = _store.newTransaction(); txn.enqueueMessage(createTransactionLogResource("dummyQ"), createEnqueueableMessage(storedMessage)); txn.commitTran(); diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java index fa2e543f8d..680c05ec0a 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java @@ -390,7 +390,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget if(outcome instanceof Accepted) { _queueEntry.lockAcquisition(); - txn.dequeue(_queueEntry.getOwningResource(), _queueEntry.getMessage(), + txn.dequeue(_queueEntry.getEnqueueRecord(), new ServerTransaction.Action() { diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java index 3572b98cad..e9e81bb623 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java @@ -235,12 +235,6 @@ public abstract class MessageConverter_to_1_0<M extends ServerMessage> implement } @Override - public void addContent(int offsetInMessage, ByteBuffer src) - { - throw new UnsupportedOperationException(); - } - - @Override public int getContent(int offsetInMessage, ByteBuffer dst) { ByteBuffer buf = allData.duplicate(); diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java index d0843c349d..e8e31dcd15 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java @@ -24,6 +24,7 @@ import org.apache.qpid.amqp_1_0.type.Outcome; import org.apache.qpid.amqp_1_0.type.messaging.Accepted; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.store.MessageEnqueueRecord; import org.apache.qpid.server.txn.ServerTransaction; public class QueueDestination extends MessageSourceDestination implements SendingDestination, ReceivingDestination @@ -45,16 +46,16 @@ public class QueueDestination extends MessageSourceDestination implements Sendin public Outcome send(final Message_1_0 message, ServerTransaction txn) { - txn.enqueue(getQueue(),message, new ServerTransaction.Action() + txn.enqueue(getQueue(),message, new ServerTransaction.EnqueueAction() { MessageReference _reference = message.newReference(); - public void postCommit() + public void postCommit(MessageEnqueueRecord... records) { try { - getQueue().enqueue(message, null); + getQueue().enqueue(message, null, records[0]); } finally { diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java index 83430e6008..5deec2bb35 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java @@ -42,6 +42,7 @@ import org.apache.qpid.amqp_1_0.type.transport.Detach; import org.apache.qpid.amqp_1_0.type.transport.ReceiverSettleMode; import org.apache.qpid.amqp_1_0.type.transport.Transfer; import org.apache.qpid.server.message.MessageReference; +import org.apache.qpid.server.store.MessageHandle; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.ServerTransaction; @@ -150,17 +151,17 @@ public class ReceivingLink_1_0 implements ReceivingLinkListener, Link_1_0, Deliv _sectionDecoder, immutableSections); - StoredMessage<MessageMetaData_1_0> storedMessage = _vhost.getMessageStore().addMessage(mmd); + MessageHandle<MessageMetaData_1_0> handle = _vhost.getMessageStore().addMessage(mmd); boolean skipping = true; int offset = 0; for(ByteBuffer bareMessageBuf : immutableSections) { - storedMessage.addContent(offset, bareMessageBuf.duplicate()); + handle.addContent(bareMessageBuf.duplicate()); offset += bareMessageBuf.remaining(); } - + final StoredMessage<MessageMetaData_1_0> storedMessage = handle.allContentAdded(); Message_1_0 message = new Message_1_0(storedMessage, fragments, getSession().getConnection().getReference()); MessageReference<Message_1_0> reference = message.newReference(); diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java index 0353b9375d..ec57284a78 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java @@ -410,59 +410,40 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS if(Boolean.TRUE.equals(detach.getClosed()) || !(TerminusDurability.UNSETTLED_STATE.equals(_durability)|| TerminusDurability.CONFIGURATION.equals(_durability))) { - while(!_consumer.trySendLock()) - { - synchronized (endpoint.getLock()) - { - try - { - endpoint.getLock().wait(100); - } - catch (InterruptedException e) - { - } - } - } - try - { - _consumer.close(); + _consumer.close(); - Modified state = new Modified(); - state.setDeliveryFailed(true); + Modified state = new Modified(); + state.setDeliveryFailed(true); - for(UnsettledAction action : _unsettledActionMap.values()) - { + for(UnsettledAction action : _unsettledActionMap.values()) + { - action.process(state,Boolean.TRUE); - } - _unsettledActionMap.clear(); + action.process(state,Boolean.TRUE); + } + _unsettledActionMap.clear(); - endpoint.close(); + endpoint.close(); - if(_destination instanceof ExchangeDestination - && (_durability == TerminusDurability.CONFIGURATION - || _durability == TerminusDurability.UNSETTLED_STATE)) + if(_destination instanceof ExchangeDestination + && (_durability == TerminusDurability.CONFIGURATION + || _durability == TerminusDurability.UNSETTLED_STATE)) + { + try { - try - { - _vhost.removeQueue((AMQQueue)_queue); - } - catch (AccessControlException e) - { - //TODO - _logger.error("Error registering subscription", e); - } + _vhost.removeQueue((AMQQueue)_queue); } - - if(_closeAction != null) + catch (AccessControlException e) { - _closeAction.run(); + //TODO + _logger.error("Error registering subscription", e); } } - finally + + if(_closeAction != null) { - _consumer.releaseSendLock(); + _closeAction.run(); } + } else if(detach.getError() != null && !_linkAttachment.getEndpoint().getSession().isSyntheticError(detach.getError())) diff --git a/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java b/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java index 0c47ddf137..789789ac33 100644 --- a/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java +++ b/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java @@ -87,12 +87,6 @@ public class MessageConverter_1_0_to_v0_10 implements MessageConverter<Message_1 } @Override - public void addContent(int offsetInMessage, ByteBuffer src) - { - throw new UnsupportedOperationException(); - } - - @Override public int getContent(int offsetInMessage, ByteBuffer dst) { int size = messageContent.length - offsetInMessage; diff --git a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java index 02c3373f85..a9637e9d4e 100644 --- a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java +++ b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java @@ -192,12 +192,6 @@ public class MessageConverter_0_10_to_0_8 implements MessageConverter<MessageTra } @Override - public void addContent(int offsetInMessage, ByteBuffer src) - { - throw new UnsupportedOperationException(); - } - - @Override public int getContent(int offsetInMessage, ByteBuffer dst) { return message.getContent(dst, offsetInMessage); diff --git a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java index fbc809305e..ee16d96d5b 100644 --- a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java +++ b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java @@ -81,12 +81,6 @@ public class MessageConverter_0_8_to_0_10 implements MessageConverter<AMQMessag } @Override - public void addContent(int offsetInMessage, ByteBuffer src) - { - throw new UnsupportedOperationException(); - } - - @Override public int getContent(int offsetInMessage, ByteBuffer dst) { return message_0_8.getContent(dst, offsetInMessage); diff --git a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java index 647e6be28a..d4529aedb1 100644 --- a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java +++ b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java @@ -89,12 +89,6 @@ public class MessageConverter_1_0_to_v0_8 implements MessageConverter<Message_1_ } @Override - public void addContent(int offsetInMessage, ByteBuffer src) - { - throw new UnsupportedOperationException(); - } - - @Override public int getContent(int offsetInMessage, ByteBuffer dst) { int size = messageContent.length - offsetInMessage; diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java index 8a3ef65979..8e64757cbb 100644 --- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java +++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java @@ -59,6 +59,7 @@ import org.apache.qpid.server.plugin.SystemNodeCreator; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.MessageConverterRegistry; import org.apache.qpid.server.store.MessageDurability; +import org.apache.qpid.server.store.MessageEnqueueRecord; import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.txn.AutoCommitTransaction; @@ -1071,6 +1072,12 @@ class ManagementNode implements MessageSource, MessageDestination } @Override + public MessageEnqueueRecord getEnqueueRecord() + { + return null; + } + + @Override public boolean isAcquiredBy(final ConsumerImpl consumer) { return false; diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java index 501ce40db7..dd5b7540a7 100644 --- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java +++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java @@ -26,6 +26,7 @@ import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.message.internal.InternalMessage; +import org.apache.qpid.server.store.MessageEnqueueRecord; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.Action; @@ -90,6 +91,12 @@ class ManagementResponse implements MessageInstance } @Override + public MessageEnqueueRecord getEnqueueRecord() + { + return null; + } + + @Override public boolean isAcquiredBy(final ConsumerImpl consumer) { return consumer == _consumer && !isDeleted(); diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java index e30571615c..4639236d60 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java +++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java @@ -256,6 +256,7 @@ public class HttpManagement extends AbstractPluginAdapter<HttpManagement> implem ServletContextHandler root = new ServletContextHandler(ServletContextHandler.SESSIONS); root.setContextPath("/"); + root.setCompactPath(true); server.setHandler(root); server.setSendServerVersion(false); final ErrorHandler errorHandler = new ErrorHandler() diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java index ab7d84e7fa..673842e387 100644 --- a/qpid/java/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java +++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java @@ -603,7 +603,7 @@ public class VirtualHostMessageStoreTest extends QpidTestCase MessageMetaData mmd = new MessageMetaData(messageInfo, headerBody, System.currentTimeMillis()); - final StoredMessage<MessageMetaData> storedMessage = _virtualHost.getMessageStore().addMessage(mmd); + final StoredMessage<MessageMetaData> storedMessage = _virtualHost.getMessageStore().addMessage(mmd).allContentAdded(); final AMQMessage currentMessage = new AMQMessage(storedMessage); |