summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2015-04-03 22:21:05 +0000
committerRobert Godfrey <rgodfrey@apache.org>2015-04-03 22:21:05 +0000
commit8bcfb7bb278644a547bddf4719265d806ea69d72 (patch)
tree6aa69e2381cb2c7d71cbe59cbc8ed8005667937b
parent47f4f5148f7a6f4fa3c214cc2efd4e4a3f44641c (diff)
downloadqpid-python-8bcfb7bb278644a547bddf4719265d806ea69d72.tar.gz
QPID-6476 : [Java Broker] Refactor MessageStore to allow more efficient implementations
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1671184 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java453
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/PreparedTransaction.java10
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java67
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java27
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java1
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java7
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java7
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java3
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java32
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageMetaData.java35
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageMetaDataType.java9
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java3
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java38
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java3
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java18
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntry.java12
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java8
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java19
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java23
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java3
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntry.java7
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java5
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java6
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntry.java10
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java6
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java535
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java231
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageEnqueueRecord.java29
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageHandle.java32
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java27
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java17
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java46
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java37
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/DistributedTransactionHandler.java6
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/MessageInstanceHandler.java5
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java115
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java90
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java67
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java87
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java131
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java22
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java40
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java102
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java108
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java2
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java60
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java26
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueTest.java6
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java7
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java2
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java18
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java2
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java8
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java42
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java5
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java2
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java32
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java2
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java6
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java8
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java7
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java195
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java2
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestRecord.java29
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java13
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java38
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java28
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockAction.java12
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java45
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AbstractVirtualHostTest.java7
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java136
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java2
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java6
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java6
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java2
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java8
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java33
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java2
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java6
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java10
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java2
-rwxr-xr-xqpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java16
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java2
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java6
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java2
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java6
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java7
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java7
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java63
-rw-r--r--qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java6
-rw-r--r--qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java6
-rw-r--r--qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java6
-rw-r--r--qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java6
-rw-r--r--qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java7
-rw-r--r--qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java7
-rw-r--r--qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java1
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java2
99 files changed, 2292 insertions, 1218 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
index c668cc8595..b030b6c091 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
@@ -26,6 +26,7 @@ import java.lang.ref.SoftReference;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.UUID;
@@ -48,6 +49,8 @@ import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.store.Event;
import org.apache.qpid.server.store.EventListener;
import org.apache.qpid.server.store.EventManager;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
+import org.apache.qpid.server.store.MessageHandle;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.StoreException;
@@ -114,7 +117,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
}
@Override
- public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData)
+ public <T extends StorableMessageMetaData> MessageHandle<T> addMessage(T metaData)
{
long newMessageId = getNextMessageId();
@@ -163,154 +166,9 @@ public abstract class AbstractBDBMessageStore implements MessageStore
}
@Override
- public void visitMessages(final MessageHandler handler) throws StoreException
+ public MessageStoreReader newMessageStoreReader()
{
- checkMessageStoreOpen();
- visitMessagesInternal(handler, getEnvironmentFacade());
- }
-
- @Override
- public StoredMessage<?> getMessage(final long messageId)
- {
- checkMessageStoreOpen();
- return getMessageInternal(messageId, getEnvironmentFacade());
- }
-
- @Override
- public void visitMessageInstances(final TransactionLogResource queue, final MessageInstanceHandler handler) throws StoreException
- {
- checkMessageStoreOpen();
-
- Cursor cursor = null;
- List<QueueEntryKey> entries = new ArrayList<QueueEntryKey>();
- try
- {
- cursor = getDeliveryDb().openCursor(null, null);
- DatabaseEntry key = new DatabaseEntry();
- DatabaseEntry value = new DatabaseEntry();
-
- QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
- keyBinding.objectToEntry(new QueueEntryKey(queue.getId(),0l), key);
-
- if(cursor.getSearchKeyRange(key,value,LockMode.DEFAULT) == OperationStatus.SUCCESS)
- {
- QueueEntryKey entry = keyBinding.entryToObject(key);
- if(entry.getQueueId().equals(queue.getId()))
- {
- entries.add(entry);
- }
- while (cursor.getNext(key, value, LockMode.DEFAULT) == OperationStatus.SUCCESS)
- {
- entry = keyBinding.entryToObject(key);
- if(entry.getQueueId().equals(queue.getId()))
- {
- entries.add(entry);
- }
- else
- {
- break;
- }
- }
- }
- }
- catch (RuntimeException e)
- {
- throw getEnvironmentFacade().handleDatabaseException("Cannot visit message instances", e);
- }
- finally
- {
- closeCursorSafely(cursor, getEnvironmentFacade());
- }
-
- for(QueueEntryKey entry : entries)
- {
- UUID queueId = entry.getQueueId();
- long messageId = entry.getMessageId();
- if (!handler.handle(queueId, messageId))
- {
- break;
- }
- }
-
- }
-
-
-
- @Override
- public void visitMessageInstances(final MessageInstanceHandler handler) throws StoreException
- {
- checkMessageStoreOpen();
-
- Cursor cursor = null;
- List<QueueEntryKey> entries = new ArrayList<QueueEntryKey>();
- try
- {
- cursor = getDeliveryDb().openCursor(null, null);
- DatabaseEntry key = new DatabaseEntry();
- QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
-
- DatabaseEntry value = new DatabaseEntry();
- while (cursor.getNext(key, value, LockMode.DEFAULT) == OperationStatus.SUCCESS)
- {
- QueueEntryKey entry = keyBinding.entryToObject(key);
- entries.add(entry);
- }
- }
- catch (RuntimeException e)
- {
- throw getEnvironmentFacade().handleDatabaseException("Cannot visit message instances", e);
- }
- finally
- {
- closeCursorSafely(cursor, getEnvironmentFacade());
- }
-
- for(QueueEntryKey entry : entries)
- {
- UUID queueId = entry.getQueueId();
- long messageId = entry.getMessageId();
- if (!handler.handle(queueId, messageId))
- {
- break;
- }
- }
-
- }
-
- @Override
- public void visitDistributedTransactions(final DistributedTransactionHandler handler) throws StoreException
- {
- checkMessageStoreOpen();
-
- Cursor cursor = null;
- try
- {
- cursor = getXidDb().openCursor(null, null);
- DatabaseEntry key = new DatabaseEntry();
- XidBinding keyBinding = XidBinding.getInstance();
- PreparedTransactionBinding valueBinding = new PreparedTransactionBinding();
- DatabaseEntry value = new DatabaseEntry();
-
- while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
- {
- Xid xid = keyBinding.entryToObject(key);
- PreparedTransaction preparedTransaction = valueBinding.entryToObject(value);
- if (!handler.handle(xid.getFormat(), xid.getGlobalId(), xid.getBranchId(),
- preparedTransaction.getEnqueues(), preparedTransaction.getDequeues()))
- {
- break;
- }
- }
-
- }
- catch (RuntimeException e)
- {
- throw getEnvironmentFacade().handleDatabaseException("Cannot recover distributed transactions", e);
- }
- finally
- {
- closeCursorSafely(cursor, getEnvironmentFacade());
- }
+ return new BDBMessageStoreReader();
}
/**
@@ -793,24 +651,23 @@ public abstract class AbstractBDBMessageStore implements MessageStore
* Extracts a message from a specified queue, in a given transaction.
*
* @param tx The transaction for the operation.
- * @param queue The queue to take the message from.
+ * @param queueId The id of the queue to take the message from.
* @param messageId The message to dequeue.
*
* @throws org.apache.qpid.server.store.StoreException If the operation fails for any reason, or if the specified message does not exist.
*/
- private void dequeueMessage(final Transaction tx, final TransactionLogResource queue,
+ private void dequeueMessage(final Transaction tx, final UUID queueId,
long messageId) throws StoreException
{
DatabaseEntry key = new DatabaseEntry();
QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
- QueueEntryKey queueEntryKey = new QueueEntryKey(queue.getId(), messageId);
- UUID id = queue.getId();
+ QueueEntryKey queueEntryKey = new QueueEntryKey(queueId, messageId);
+ UUID id = queueId;
keyBinding.objectToEntry(queueEntryKey, key);
if (getLogger().isDebugEnabled())
{
- getLogger().debug("Dequeue message id " + messageId + " from queue "
- + queue.getName() + " with id " + id);
+ getLogger().debug("Dequeue message id " + messageId + " from queue with id " + id);
}
try
@@ -819,19 +676,16 @@ public abstract class AbstractBDBMessageStore implements MessageStore
OperationStatus status = getDeliveryDb().delete(tx, key);
if (status == OperationStatus.NOTFOUND)
{
- throw new StoreException("Unable to find message with id " + messageId + " on queue "
- + queue.getName() + " with id " + id);
+ throw new StoreException("Unable to find message with id " + messageId + " on queue with id " + id);
}
else if (status != OperationStatus.SUCCESS)
{
- throw new StoreException("Unable to remove message with id " + messageId + " on queue"
- + queue.getName() + " with id " + id);
+ throw new StoreException("Unable to remove message with id " + messageId + " on queue with id " + id);
}
if (getLogger().isDebugEnabled())
{
- getLogger().debug("Removed message " + messageId + " on queue "
- + queue.getName() + " with id " + id);
+ getLogger().debug("Removed message " + messageId + " on queue with id " + id);
}
}
@@ -849,8 +703,8 @@ public abstract class AbstractBDBMessageStore implements MessageStore
long format,
byte[] globalId,
byte[] branchId,
- org.apache.qpid.server.store.Transaction.Record[] enqueues,
- org.apache.qpid.server.store.Transaction.Record[] dequeues) throws StoreException
+ org.apache.qpid.server.store.Transaction.EnqueueRecord[] enqueues,
+ org.apache.qpid.server.store.Transaction.DequeueRecord[] dequeues) throws StoreException
{
DatabaseEntry key = new DatabaseEntry();
Xid xid = new Xid(format, globalId, branchId);
@@ -862,7 +716,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
PreparedTransactionBinding valueBinding = new PreparedTransactionBinding();
valueBinding.objectToEntry(preparedTransaction, value);
List<Runnable> postActions = new ArrayList<>();
- for(org.apache.qpid.server.store.Transaction.Record enqueue : enqueues)
+ for(org.apache.qpid.server.store.Transaction.EnqueueRecord enqueue : enqueues)
{
StoredMessage storedMessage = enqueue.getMessage().getStoredMessage();
if(storedMessage instanceof StoredBDBMessage)
@@ -1187,7 +1041,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
}
}
- final class StoredBDBMessage<T extends StorableMessageMetaData> implements StoredMessage<T>
+ final class StoredBDBMessage<T extends StorableMessageMetaData> implements StoredMessage<T>, MessageHandle<T>
{
private final long _messageId;
@@ -1234,7 +1088,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
}
@Override
- public void addContent(int offsetInMessage, ByteBuffer src)
+ public void addContent(ByteBuffer src)
{
src = src.slice();
byte[] data = _messageDataRef.getData();
@@ -1259,6 +1113,12 @@ public abstract class AbstractBDBMessageStore implements MessageStore
}
@Override
+ public StoredMessage<T> allContentAdded()
+ {
+ return this;
+ }
+
+ @Override
public int getContent(int offsetInMessage, ByteBuffer dst)
{
byte[] data = _messageDataRef.getData();
@@ -1466,7 +1326,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
}
@Override
- public void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message) throws StoreException
+ public MessageEnqueueRecord enqueueMessage(TransactionLogResource queue, EnqueueableMessage message) throws StoreException
{
checkMessageStoreOpen();
@@ -1487,14 +1347,16 @@ public abstract class AbstractBDBMessageStore implements MessageStore
}
AbstractBDBMessageStore.this.enqueueMessage(_txn, queue, message.getMessageNumber());
+ return new BDBEnqueueRecord(queue.getId(), message.getMessageNumber());
}
@Override
- public void dequeueMessage(TransactionLogResource queue, EnqueueableMessage message) throws StoreException
+ public void dequeueMessage(final MessageEnqueueRecord enqueueRecord)
{
checkMessageStoreOpen();
- AbstractBDBMessageStore.this.dequeueMessage(_txn, queue, message.getMessageNumber());
+ AbstractBDBMessageStore.this.dequeueMessage(_txn, enqueueRecord.getQueueId(),
+ enqueueRecord.getMessageNumber());
}
@Override
@@ -1546,21 +1408,268 @@ public abstract class AbstractBDBMessageStore implements MessageStore
}
@Override
- public void removeXid(long format, byte[] globalId, byte[] branchId) throws StoreException
+ public void removeXid(final StoredXidRecord record)
{
checkMessageStoreOpen();
- AbstractBDBMessageStore.this.removeXid(_txn, format, globalId, branchId);
+ AbstractBDBMessageStore.this.removeXid(_txn, record.getFormat(), record.getGlobalId(), record.getBranchId());
}
@Override
- public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues,
- Record[] dequeues) throws StoreException
+ public StoredXidRecord recordXid(final long format, final byte[] globalId, final byte[] branchId, final EnqueueRecord[] enqueues,
+ final DequeueRecord[] dequeues) throws StoreException
{
checkMessageStoreOpen();
_postCommitActions.addAll(AbstractBDBMessageStore.this.recordXid(_txn, format, globalId, branchId, enqueues, dequeues));
+ return new BDBStoredXidRecord(format, globalId, branchId);
}
+
}
+ private static class BDBStoredXidRecord implements org.apache.qpid.server.store.Transaction.StoredXidRecord
+ {
+ private final long _format;
+ private final byte[] _globalId;
+ private final byte[] _branchId;
+
+ public BDBStoredXidRecord(final long format, final byte[] globalId, final byte[] branchId)
+ {
+ _format = format;
+ _globalId = globalId;
+ _branchId = branchId;
+ }
+
+ @Override
+ public long getFormat()
+ {
+ return _format;
+ }
+
+ @Override
+ public byte[] getGlobalId()
+ {
+ return _globalId;
+ }
+
+ @Override
+ public byte[] getBranchId()
+ {
+ return _branchId;
+ }
+
+ @Override
+ public boolean equals(final Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+
+ final BDBStoredXidRecord that = (BDBStoredXidRecord) o;
+
+ return _format == that._format
+ && Arrays.equals(_globalId, that._globalId)
+ && Arrays.equals(_branchId, that._branchId);
+
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = (int) (_format ^ (_format >>> 32));
+ result = 31 * result + Arrays.hashCode(_globalId);
+ result = 31 * result + Arrays.hashCode(_branchId);
+ return result;
+ }
+ }
+ public static class BDBEnqueueRecord implements MessageEnqueueRecord
+ {
+ private final UUID _queueId;
+
+ private final long _messageNumber;
+
+ public BDBEnqueueRecord(final UUID queueid, final long messageNumber)
+ {
+ _queueId = queueid;
+ _messageNumber = messageNumber;
+ }
+
+ public long getMessageNumber()
+ {
+ return _messageNumber;
+ }
+
+ public UUID getQueueId()
+ {
+ return _queueId;
+ }
+
+ }
+
+ private class BDBMessageStoreReader implements MessageStoreReader
+ {
+ @Override
+ public void visitMessages(final MessageHandler handler) throws StoreException
+ {
+ checkMessageStoreOpen();
+ visitMessagesInternal(handler, getEnvironmentFacade());
+ }
+
+ @Override
+ public StoredMessage<?> getMessage(final long messageId)
+ {
+ checkMessageStoreOpen();
+ return getMessageInternal(messageId, getEnvironmentFacade());
+ }
+
+ @Override
+ public void close()
+ {
+
+ }
+
+ @Override
+ public void visitMessageInstances(final TransactionLogResource queue, final MessageInstanceHandler handler) throws StoreException
+ {
+ checkMessageStoreOpen();
+
+ Cursor cursor = null;
+ List<QueueEntryKey> entries = new ArrayList<QueueEntryKey>();
+ try
+ {
+ cursor = getDeliveryDb().openCursor(null, null);
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry value = new DatabaseEntry();
+
+ QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
+ keyBinding.objectToEntry(new QueueEntryKey(queue.getId(),0l), key);
+
+ if(cursor.getSearchKeyRange(key,value,LockMode.DEFAULT) == OperationStatus.SUCCESS)
+ {
+ QueueEntryKey entry = keyBinding.entryToObject(key);
+ if(entry.getQueueId().equals(queue.getId()))
+ {
+ entries.add(entry);
+ }
+ while (cursor.getNext(key, value, LockMode.DEFAULT) == OperationStatus.SUCCESS)
+ {
+ entry = keyBinding.entryToObject(key);
+ if(entry.getQueueId().equals(queue.getId()))
+ {
+ entries.add(entry);
+ }
+ else
+ {
+ break;
+ }
+ }
+ }
+ }
+ catch (RuntimeException e)
+ {
+ throw getEnvironmentFacade().handleDatabaseException("Cannot visit message instances", e);
+ }
+ finally
+ {
+ closeCursorSafely(cursor, getEnvironmentFacade());
+ }
+
+ for(QueueEntryKey entry : entries)
+ {
+ UUID queueId = entry.getQueueId();
+ long messageId = entry.getMessageId();
+ if (!handler.handle(new BDBEnqueueRecord(queueId, messageId)))
+ {
+ break;
+ }
+ }
+
+ }
+
+
+
+ @Override
+ public void visitMessageInstances(final MessageInstanceHandler handler) throws StoreException
+ {
+ checkMessageStoreOpen();
+
+ Cursor cursor = null;
+ List<QueueEntryKey> entries = new ArrayList<QueueEntryKey>();
+ try
+ {
+ cursor = getDeliveryDb().openCursor(null, null);
+ DatabaseEntry key = new DatabaseEntry();
+ QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
+
+ DatabaseEntry value = new DatabaseEntry();
+ while (cursor.getNext(key, value, LockMode.DEFAULT) == OperationStatus.SUCCESS)
+ {
+ QueueEntryKey entry = keyBinding.entryToObject(key);
+ entries.add(entry);
+ }
+ }
+ catch (RuntimeException e)
+ {
+ throw getEnvironmentFacade().handleDatabaseException("Cannot visit message instances", e);
+ }
+ finally
+ {
+ closeCursorSafely(cursor, getEnvironmentFacade());
+ }
+
+ for(QueueEntryKey entry : entries)
+ {
+ UUID queueId = entry.getQueueId();
+ long messageId = entry.getMessageId();
+ if (!handler.handle(new BDBEnqueueRecord(queueId, messageId)))
+ {
+ break;
+ }
+ }
+
+ }
+
+ @Override
+ public void visitDistributedTransactions(final DistributedTransactionHandler handler) throws StoreException
+ {
+ checkMessageStoreOpen();
+
+ Cursor cursor = null;
+ try
+ {
+ cursor = getXidDb().openCursor(null, null);
+ DatabaseEntry key = new DatabaseEntry();
+ XidBinding keyBinding = XidBinding.getInstance();
+ PreparedTransactionBinding valueBinding = new PreparedTransactionBinding();
+ DatabaseEntry value = new DatabaseEntry();
+
+ while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+ {
+ Xid xid = keyBinding.entryToObject(key);
+ PreparedTransaction preparedTransaction = valueBinding.entryToObject(value);
+ if (!handler.handle(new BDBStoredXidRecord(xid.getFormat(), xid.getGlobalId(), xid.getBranchId()),
+ preparedTransaction.getEnqueues(), preparedTransaction.getDequeues()))
+ {
+ break;
+ }
+ }
+
+ }
+ catch (RuntimeException e)
+ {
+ throw getEnvironmentFacade().handleDatabaseException("Cannot recover distributed transactions", e);
+ }
+ finally
+ {
+ closeCursorSafely(cursor, getEnvironmentFacade());
+ }
+ }
+
+
+ }
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/PreparedTransaction.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/PreparedTransaction.java
index eb5c4677ff..9ecc5b3283 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/PreparedTransaction.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/PreparedTransaction.java
@@ -25,21 +25,21 @@ import org.apache.qpid.server.store.Transaction;
public class PreparedTransaction
{
- private final Transaction.Record[] _enqueues;
- private final Transaction.Record[] _dequeues;
+ private final Transaction.EnqueueRecord[] _enqueues;
+ private final Transaction.DequeueRecord[] _dequeues;
- public PreparedTransaction(Transaction.Record[] enqueues, Transaction.Record[] dequeues)
+ public PreparedTransaction(Transaction.EnqueueRecord[] enqueues, Transaction.DequeueRecord[] dequeues)
{
_enqueues = enqueues;
_dequeues = dequeues;
}
- public Transaction.Record[] getEnqueues()
+ public Transaction.EnqueueRecord[] getEnqueues()
{
return _enqueues;
}
- public Transaction.Record[] getDequeues()
+ public Transaction.DequeueRecord[] getDequeues()
{
return _dequeues;
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java
index 1f4cf45ce1..4d111e5c4f 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java
@@ -28,9 +28,11 @@ import com.sleepycat.bind.tuple.TupleInput;
import com.sleepycat.bind.tuple.TupleOutput;
import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.store.MessageDurability;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.Transaction;
import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.store.berkeleydb.AbstractBDBMessageStore;
import org.apache.qpid.server.store.berkeleydb.entry.PreparedTransaction;
public class PreparedTransactionBinding extends TupleBinding<PreparedTransaction>
@@ -38,23 +40,34 @@ public class PreparedTransactionBinding extends TupleBinding<PreparedTransaction
@Override
public PreparedTransaction entryToObject(TupleInput input)
{
- Transaction.Record[] enqueues = readRecords(input);
+ Transaction.EnqueueRecord[] enqueues = readEnqueueRecords(input);
- Transaction.Record[] dequeues = readRecords(input);
+ Transaction.DequeueRecord[] dequeues = readDequeueRecords(input);
return new PreparedTransaction(enqueues, dequeues);
}
- private Transaction.Record[] readRecords(TupleInput input)
+ private Transaction.EnqueueRecord[] readEnqueueRecords(TupleInput input)
{
- Transaction.Record[] records = new Transaction.Record[input.readInt()];
+ Transaction.EnqueueRecord[] records = new Transaction.EnqueueRecord[input.readInt()];
for(int i = 0; i < records.length; i++)
{
- records[i] = new RecordImpl(new UUID(input.readLong(), input.readLong()), input.readLong());
+ records[i] = new EnqueueRecordImpl(new UUID(input.readLong(), input.readLong()), input.readLong());
}
return records;
}
+ private Transaction.DequeueRecord[] readDequeueRecords(TupleInput input)
+ {
+ Transaction.DequeueRecord[] records = new Transaction.DequeueRecord[input.readInt()];
+ for(int i = 0; i < records.length; i++)
+ {
+ records[i] = new DequeueRecordImpl(new UUID(input.readLong(), input.readLong()), input.readLong());
+ }
+ return records;
+ }
+
+
@Override
public void objectToEntry(PreparedTransaction preparedTransaction, TupleOutput output)
{
@@ -63,7 +76,7 @@ public class PreparedTransactionBinding extends TupleBinding<PreparedTransaction
}
- private void writeRecords(Transaction.Record[] records, TupleOutput output)
+ private void writeRecords(Transaction.EnqueueRecord[] records, TupleOutput output)
{
if(records == null)
{
@@ -72,7 +85,7 @@ public class PreparedTransactionBinding extends TupleBinding<PreparedTransaction
else
{
output.writeInt(records.length);
- for(Transaction.Record record : records)
+ for(Transaction.EnqueueRecord record : records)
{
UUID id = record.getResource().getId();
output.writeLong(id.getMostSignificantBits());
@@ -82,13 +95,32 @@ public class PreparedTransactionBinding extends TupleBinding<PreparedTransaction
}
}
- private static class RecordImpl implements Transaction.Record, TransactionLogResource, EnqueueableMessage
+ private void writeRecords(Transaction.DequeueRecord[] records, TupleOutput output)
+ {
+ if(records == null)
+ {
+ output.writeInt(0);
+ }
+ else
+ {
+ output.writeInt(records.length);
+ for(Transaction.DequeueRecord record : records)
+ {
+ UUID id = record.getEnqueueRecord().getQueueId();
+ output.writeLong(id.getMostSignificantBits());
+ output.writeLong(id.getLeastSignificantBits());
+ output.writeLong(record.getEnqueueRecord().getMessageNumber());
+ }
+ }
+ }
+
+ private static class EnqueueRecordImpl implements Transaction.EnqueueRecord, TransactionLogResource, EnqueueableMessage
{
private long _messageNumber;
private UUID _queueId;
- public RecordImpl(UUID queueId, long messageNumber)
+ public EnqueueRecordImpl(UUID queueId, long messageNumber)
{
_messageNumber = messageNumber;
_queueId = queueId;
@@ -137,4 +169,21 @@ public class PreparedTransactionBinding extends TupleBinding<PreparedTransaction
return MessageDurability.DEFAULT;
}
}
+
+ private static class DequeueRecordImpl implements Transaction.DequeueRecord
+ {
+
+ private final AbstractBDBMessageStore.BDBEnqueueRecord _record;
+
+ public DequeueRecordImpl(final UUID queueId, final long messageNumber)
+ {
+ _record = new AbstractBDBMessageStore.BDBEnqueueRecord(queueId, messageNumber);
+ }
+
+ @Override
+ public MessageEnqueueRecord getEnqueueRecord()
+ {
+ return _record;
+ }
+ }
}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
index fb0c11f6e5..3f8c1a7a99 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
@@ -36,6 +36,7 @@ import org.apache.qpid.server.protocol.v0_10.MessageMetaDataType_0_10;
import org.apache.qpid.server.protocol.v0_10.MessageMetaData_0_10;
import org.apache.qpid.server.protocol.v0_8.MessageMetaData;
import org.apache.qpid.server.protocol.v0_8.MessageMetaDataType_0_8;
+import org.apache.qpid.server.store.MessageHandle;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.MessageStoreTestCase;
import org.apache.qpid.server.store.StorableMessageMetaData;
@@ -104,14 +105,15 @@ public class BDBMessageStoreTest extends MessageStoreTestCase
ContentHeaderBody chb_0_8 = createContentHeaderBody_0_8(props_0_8, bodySize);
MessageMetaData messageMetaData_0_8 = new MessageMetaData(pubInfoBody_0_8, chb_0_8);
- StoredMessage<MessageMetaData> storedMessage_0_8 = bdbStore.addMessage(messageMetaData_0_8);
+ MessageHandle<MessageMetaData> messageHandle_0_8 = bdbStore.addMessage(messageMetaData_0_8);
long origArrivalTime_0_8 = messageMetaData_0_8.getArrivalTime();
- long messageid_0_8 = storedMessage_0_8.getMessageNumber();
- storedMessage_0_8.addContent(0, firstContentBytes_0_8);
- storedMessage_0_8.addContent(firstContentBytes_0_8.limit(), secondContentBytes_0_8);
- ((AbstractBDBMessageStore.StoredBDBMessage)storedMessage_0_8).flushToStore();
+ messageHandle_0_8.addContent(firstContentBytes_0_8);
+ messageHandle_0_8.addContent(secondContentBytes_0_8);
+ final StoredMessage<MessageMetaData> storedMessage_0_8 = messageHandle_0_8.allContentAdded();
+ long messageid_0_8 = storedMessage_0_8.getMessageNumber();
+ ((AbstractBDBMessageStore.StoredBDBMessage)messageHandle_0_8).flushToStore();
/*
* Create and insert a 0-10 message (metadata and content)
@@ -124,13 +126,14 @@ public class BDBMessageStoreTest extends MessageStoreTestCase
MessageAcquireMode.PRE_ACQUIRED, header_0_10, completeContentBody_0_10);
MessageMetaData_0_10 messageMetaData_0_10 = new MessageMetaData_0_10(xfr_0_10);
- StoredMessage<MessageMetaData_0_10> storedMessage_0_10 = bdbStore.addMessage(messageMetaData_0_10);
+ MessageHandle<MessageMetaData_0_10> messageHandle_0_10 = bdbStore.addMessage(messageMetaData_0_10);
long origArrivalTime_0_10 = messageMetaData_0_10.getArrivalTime();
- long messageid_0_10 = storedMessage_0_10.getMessageNumber();
- storedMessage_0_10.addContent(0, completeContentBody_0_10);
- ((AbstractBDBMessageStore.StoredBDBMessage)storedMessage_0_10).flushToStore();
+ messageHandle_0_10.addContent(completeContentBody_0_10);
+ final StoredMessage<MessageMetaData_0_10> storedMessage_0_10 = messageHandle_0_10.allContentAdded();
+ long messageid_0_10 = storedMessage_0_10.getMessageNumber();
+ ((AbstractBDBMessageStore.StoredBDBMessage)messageHandle_0_10).flushToStore();
/*
* reload the store only (read-only)
@@ -352,12 +355,12 @@ public class BDBMessageStoreTest extends MessageStoreTestCase
ContentHeaderBody chb_0_8 = createContentHeaderBody_0_8(props_0_8, bodySize);
MessageMetaData messageMetaData_0_8 = new MessageMetaData(pubInfoBody_0_8, chb_0_8);
- StoredMessage<MessageMetaData> storedMessage_0_8 = store.addMessage(messageMetaData_0_8);
+ MessageHandle<MessageMetaData> storedMessage_0_8 = store.addMessage(messageMetaData_0_8);
- storedMessage_0_8.addContent(0, chunk1);
+ storedMessage_0_8.addContent(chunk1);
((AbstractBDBMessageStore.StoredBDBMessage)storedMessage_0_8).flushToStore();
- return storedMessage_0_8;
+ return storedMessage_0_8.allContentAdded();
}
public void testOnDelete() throws Exception
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
index 321f8b28e6..ccae965edb 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
@@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.util.StateChangeListener;
+import org.apache.qpid.thread.ThreadFactory;
public abstract class AbstractConsumerTarget implements ConsumerTarget
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
index ac623bc337..6fc940d665 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
@@ -66,6 +66,7 @@ import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.StateTransition;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
@@ -522,17 +523,17 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
baseQueues = queues.toArray(new BaseQueue[queues.size()]);
}
- txn.enqueue(queues,message, new ServerTransaction.Action()
+ txn.enqueue(queues,message, new ServerTransaction.EnqueueAction()
{
MessageReference _reference = message.newReference();
- public void postCommit()
+ public void postCommit(MessageEnqueueRecord... records)
{
try
{
for(int i = 0; i < baseQueues.length; i++)
{
- baseQueues[i].enqueue(message, postEnqueueAction);
+ baseQueues[i].enqueue(message, postEnqueueAction, records[i]);
}
}
finally
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java
index 7cf1963331..e2dc1bf754 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java
@@ -25,6 +25,7 @@ import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
@@ -82,15 +83,15 @@ public class DefaultDestination implements MessageDestination
}
else
{
- txn.enqueue(q,message, new ServerTransaction.Action()
+ txn.enqueue(q,message, new ServerTransaction.EnqueueAction()
{
MessageReference _reference = message.newReference();
- public void postCommit()
+ public void postCommit(MessageEnqueueRecord... records)
{
try
{
- q.enqueue(message, postEnqueueAction);
+ q.enqueue(message, postEnqueueAction, records[0]);
}
finally
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
index d3518f428b..1ba506fb7e 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.message;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.filter.Filterable;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
@@ -81,6 +82,8 @@ public interface MessageInstance
ConsumerImpl getAcquiringConsumer();
+ MessageEnqueueRecord getEnqueueRecord();
+
public static enum State
{
AVAILABLE,
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
index 7801cf3d42..4d6216b9df 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
@@ -33,6 +33,7 @@ import java.util.Map;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.AbstractServerMessageImpl;
+import org.apache.qpid.server.store.MessageHandle;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
@@ -134,11 +135,10 @@ public class InternalMessage extends AbstractServerMessageImpl<InternalMessage,
final InternalMessageMetaData metaData = InternalMessageMetaData.create(persistent, internalHeader, bytes.length);
- StoredMessage<InternalMessageMetaData> handle =
- store.addMessage(metaData);
- handle.addContent(0, ByteBuffer.wrap(bytes));
-
- return new InternalMessage(handle, internalHeader, bodyObject);
+ MessageHandle<InternalMessageMetaData> handle = store.addMessage(metaData);
+ handle.addContent(ByteBuffer.wrap(bytes));
+ StoredMessage<InternalMessageMetaData> storedMessage = handle.allContentAdded();
+ return new InternalMessage(storedMessage, internalHeader, bodyObject);
}
catch (IOException e)
{
@@ -148,7 +148,13 @@ public class InternalMessage extends AbstractServerMessageImpl<InternalMessage,
public static InternalMessage createStringMessage(MessageStore store, AMQMessageHeader header, String messageBody)
{
- return createMessage(store, header, messageBody, false);
+ return createStringMessage(store, header, messageBody, false);
+ }
+
+
+ public static InternalMessage createStringMessage(MessageStore store, AMQMessageHeader header, String messageBody, boolean persistent)
+ {
+ return createMessage(store, header, messageBody, persistent);
}
public static InternalMessage createMapMessage(MessageStore store, AMQMessageHeader header, Map<? extends Object,? extends Object> messageBody)
@@ -163,7 +169,13 @@ public class InternalMessage extends AbstractServerMessageImpl<InternalMessage,
public static InternalMessage createBytesMessage(MessageStore store, AMQMessageHeader header, byte[] messageBody)
{
- return createMessage(store, header, messageBody, false);
+ return createBytesMessage(store, header, messageBody, false);
+ }
+
+
+ public static InternalMessage createBytesMessage(MessageStore store, AMQMessageHeader header, byte[] messageBody, boolean persist)
+ {
+ return createMessage(store, header, messageBody, persist);
}
public static InternalMessage convert(long messageNumber, boolean persistent, AMQMessageHeader header, Object messageBody)
@@ -207,12 +219,6 @@ public class InternalMessage extends AbstractServerMessageImpl<InternalMessage,
}
@Override
- public void addContent(final int offsetInMessage, final ByteBuffer src)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
public int getContent(final int offsetInMessage, final ByteBuffer dst)
{
ByteBuffer buffer = ByteBuffer.wrap(bytes);
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageMetaData.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageMetaData.java
index ed16335cf2..28fe23d11b 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageMetaData.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageMetaData.java
@@ -24,6 +24,7 @@ import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.nio.ByteBuffer;
@@ -33,14 +34,27 @@ public class InternalMessageMetaData implements StorableMessageMetaData
private boolean _isPersistent;
- private byte[] _headerBytes;
+ private InternalMessageHeader _header;
private int _contentSize;
+ private byte[] _headerBytes;
- public InternalMessageMetaData(final boolean isPersistent, final byte[] headerBytes, final int contentSize)
+ public InternalMessageMetaData(final boolean isPersistent, final InternalMessageHeader header, final int contentSize)
{
_isPersistent = isPersistent;
- _headerBytes = headerBytes;
+ _header = header;
_contentSize = contentSize;
+
+ try(ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+ ObjectOutputStream os = new ObjectOutputStream(bytesOut))
+ {
+ os.writeInt(contentSize);
+ os.writeObject(header);
+ _headerBytes = bytesOut.toByteArray();
+ }
+ catch (IOException e)
+ {
+ throw new ConnectionScopedRuntimeException("Unexpected IO Exception on in memory operation", e);
+ }
}
@Override
@@ -76,20 +90,7 @@ public class InternalMessageMetaData implements StorableMessageMetaData
static InternalMessageMetaData create(boolean persistent, final InternalMessageHeader header, int contentSize)
{
- ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
- try
- {
- ObjectOutputStream os = new ObjectOutputStream(bytesOut);
- os.writeObject(header);
- byte[] bytes = bytesOut.toByteArray();
-
- return new InternalMessageMetaData(persistent, bytes, contentSize);
-
- }
- catch (IOException e)
- {
- throw new ConnectionScopedRuntimeException("Unexpected IO Exception on in memory operation", e);
- }
+ return new InternalMessageMetaData(persistent, header, contentSize);
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageMetaDataType.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageMetaDataType.java
index 7fc13fb40b..966e16b22c 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageMetaDataType.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageMetaDataType.java
@@ -34,7 +34,7 @@ import org.apache.qpid.util.ByteBufferInputStream;
@PluggableService
public class InternalMessageMetaDataType implements MessageMetaDataType<InternalMessageMetaData>
{
- public static final int INTERNAL_ORDINAL = 999;
+ public static final int INTERNAL_ORDINAL = 255;
public static final String TYPE = "INTERNAL";
@Override
@@ -46,10 +46,13 @@ public class InternalMessageMetaDataType implements MessageMetaDataType<Internal
@Override
public InternalMessageMetaData createMetaData(final ByteBuffer buf)
{
+
+
try(ObjectInputStream is = new ObjectInputStream(new ByteBufferInputStream(buf)))
{
- InternalMessageMetaData metaData = (InternalMessageMetaData) is.readObject();
- return metaData;
+ int contentSize = is.readInt();
+ InternalMessageHeader header = (InternalMessageHeader) is.readObject();
+ return new InternalMessageMetaData(true, header, contentSize);
}
catch (IOException e)
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index aa64d6947d..aefb96cc9c 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -33,6 +33,7 @@ import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.QueueNotificationListener;
import org.apache.qpid.server.protocol.CapacityChecker;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
import org.apache.qpid.server.util.Deletable;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@@ -112,7 +113,7 @@ public interface AMQQueue<X extends AMQQueue<X>>
void completeRecovery();
- void recover(ServerMessage<?> message);
+ void recover(ServerMessage<?> message, final MessageEnqueueRecord enqueueRecord);
void setTargetSize(long targetSize);
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 3b42014136..568fb546b5 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -57,7 +57,6 @@ import org.apache.qpid.server.connection.SessionPrincipal;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.exchange.ExchangeImpl;
-import org.apache.qpid.server.filter.ArrivalTimeFilter;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.MessageFilter;
import org.apache.qpid.server.logging.EventLogger;
@@ -90,6 +89,7 @@ import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.store.MessageDurability;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.txn.AutoCommitTransaction;
@@ -1046,7 +1046,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
// ------ Enqueue / Dequeue
- public final void enqueue(ServerMessage message, Action<? super MessageInstance> action)
+ public final void enqueue(ServerMessage message, Action<? super MessageInstance> action, MessageEnqueueRecord enqueueRecord)
{
incrementQueueCount();
incrementQueueSize(message);
@@ -1060,30 +1060,30 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
if(_recovering.get())
{
- EnqueueRequest request = new EnqueueRequest(message, action);
+ EnqueueRequest request = new EnqueueRequest(message, action, enqueueRecord);
_postRecoveryQueue.add(request);
// deal with the case the recovering status changed just as we added to the post recovery queue
if(!_recovering.get() && _postRecoveryQueue.remove(request))
{
- doEnqueue(message, action);
+ doEnqueue(message, action, enqueueRecord);
}
}
else
{
- doEnqueue(message, action);
+ doEnqueue(message, action, enqueueRecord);
}
}
- public final void recover(ServerMessage message)
+ public final void recover(ServerMessage message, final MessageEnqueueRecord enqueueRecord)
{
incrementQueueCount();
incrementQueueSize(message);
_totalMessagesReceived.incrementAndGet();
- doEnqueue(message, null);
+ doEnqueue(message, null, enqueueRecord);
}
@@ -1107,15 +1107,15 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
{
EnqueueRequest request = _postRecoveryQueue.poll();
MessageReference<?> messageReference = request.getMessage();
- doEnqueue(messageReference.getMessage(), request.getAction());
+ doEnqueue(messageReference.getMessage(), request.getAction(), request.getEnqueueRecord());
messageReference.release();
}
}
- protected void doEnqueue(final ServerMessage message, final Action<? super MessageInstance> action)
+ protected void doEnqueue(final ServerMessage message, final Action<? super MessageInstance> action, MessageEnqueueRecord enqueueRecord)
{
final QueueConsumer<?> exclusiveSub = _exclusiveSubscriber;
- final QueueEntry entry = getEntries().add(message);
+ final QueueEntry entry = getEntries().add(message, enqueueRecord);
updateExpiration(entry);
try
@@ -1812,7 +1812,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
private void dequeueEntry(final QueueEntry node, ServerTransaction txn)
{
- txn.dequeue(this, node.getMessage(),
+ txn.dequeue(node.getEnqueueRecord(),
new ServerTransaction.Action()
{
@@ -2660,15 +2660,15 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
if(!message.isReferenced(this))
{
- txn.enqueue(this, message, new ServerTransaction.Action()
+ txn.enqueue(this, message, new ServerTransaction.EnqueueAction()
{
MessageReference _reference = message.newReference();
- public void postCommit()
+ public void postCommit(MessageEnqueueRecord... records)
{
try
{
- AbstractQueue.this.enqueue(message, postEnqueueAction);
+ AbstractQueue.this.enqueue(message, postEnqueueAction, records[0]);
}
finally
{
@@ -3140,10 +3140,13 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
{
private final MessageReference<?> _message;
private final Action<? super MessageInstance> _action;
+ private final MessageEnqueueRecord _enqueueRecord;
public EnqueueRequest(final ServerMessage message,
- final Action<? super MessageInstance> action)
+ final Action<? super MessageInstance> action,
+ final MessageEnqueueRecord enqueueRecord)
{
+ _enqueueRecord = enqueueRecord;
_message = message.newReference();
_action = action;
}
@@ -3157,5 +3160,10 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
{
return _action;
}
+
+ public MessageEnqueueRecord getEnqueueRecord()
+ {
+ return _enqueueRecord;
+ }
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java
index 01594e68a9..dc2f45a488 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java
@@ -23,12 +23,13 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.util.Action;
public interface BaseQueue extends TransactionLogResource
{
- void enqueue(ServerMessage message, Action<? super MessageInstance> action);
+ void enqueue(ServerMessage message, Action<? super MessageInstance> action, MessageEnqueueRecord record);
boolean isDurable();
boolean isDeleted();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java
index 1e250ac2c9..1e184da750 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java
@@ -31,6 +31,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
@@ -67,9 +68,10 @@ public class LastValueQueueList extends OrderedQueueEntryList
}
@Override
- protected ConflationQueueEntry createQueueEntry(ServerMessage message)
+ protected ConflationQueueEntry createQueueEntry(ServerMessage message,
+ final MessageEnqueueRecord enqueueRecord)
{
- return new ConflationQueueEntry(this, message);
+ return new ConflationQueueEntry(this, message, enqueueRecord);
}
@@ -78,9 +80,9 @@ public class LastValueQueueList extends OrderedQueueEntryList
* Updates the list using super.add and also updates {@link #_latestValuesMap} and discards entries as necessary.
*/
@Override
- public ConflationQueueEntry add(final ServerMessage message)
+ public ConflationQueueEntry add(final ServerMessage message, final MessageEnqueueRecord enqueueRecord)
{
- final ConflationQueueEntry addedEntry = (ConflationQueueEntry) super.add(message);
+ final ConflationQueueEntry addedEntry = (ConflationQueueEntry) super.add(message, enqueueRecord);
final Object keyValue = message.getMessageHeader().getHeader(_conflationKey);
if (keyValue != null)
@@ -173,7 +175,7 @@ public class LastValueQueueList extends OrderedQueueEntryList
if(entry.acquire())
{
ServerTransaction txn = new AutoCommitTransaction(getQueue().getVirtualHost().getMessageStore());
- txn.dequeue(entry.getQueue(),entry.getMessage(),
+ txn.dequeue(entry.getEnqueueRecord(),
new ServerTransaction.Action()
{
@Override
@@ -201,9 +203,11 @@ public class LastValueQueueList extends OrderedQueueEntryList
super(queueEntryList);
}
- public ConflationQueueEntry(LastValueQueueList queueEntryList, ServerMessage message)
+ public ConflationQueueEntry(LastValueQueueList queueEntryList,
+ ServerMessage message,
+ final MessageEnqueueRecord messageEnqueueRecord)
{
- super(queueEntryList, message);
+ super(queueEntryList, message, messageEnqueueRecord);
}
@Override
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntry.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntry.java
index 6532b1224b..d45dceb5a8 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntry.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntry.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
@@ -38,14 +39,11 @@ public abstract class OrderedQueueEntry extends QueueEntryImpl
super(queueEntryList);
}
- public OrderedQueueEntry(OrderedQueueEntryList queueEntryList, ServerMessage message, final long entryId)
+ public OrderedQueueEntry(OrderedQueueEntryList queueEntryList,
+ ServerMessage message,
+ final MessageEnqueueRecord messageEnqueueRecord)
{
- super(queueEntryList, message, entryId);
- }
-
- public OrderedQueueEntry(OrderedQueueEntryList queueEntryList, ServerMessage message)
- {
- super(queueEntryList, message);
+ super(queueEntryList, message, messageEnqueueRecord);
}
@Override
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java
index f3822bc9ca..3e1a535018 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java
@@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
public abstract class OrderedQueueEntryList implements QueueEntryList
{
@@ -77,9 +78,9 @@ public abstract class OrderedQueueEntryList implements QueueEntryList
}
- public QueueEntry add(ServerMessage message)
+ public QueueEntry add(ServerMessage message, final MessageEnqueueRecord enqueueRecord)
{
- OrderedQueueEntry node = createQueueEntry(message);
+ OrderedQueueEntry node = createQueueEntry(message, enqueueRecord);
for (;;)
{
OrderedQueueEntry tail = _tail;
@@ -104,7 +105,8 @@ public abstract class OrderedQueueEntryList implements QueueEntryList
}
}
- abstract protected OrderedQueueEntry createQueueEntry(ServerMessage<?> message);
+ abstract protected OrderedQueueEntry createQueueEntry(ServerMessage<?> message,
+ final MessageEnqueueRecord enqueueRecord);
@Override
public QueueEntry next(QueueEntry node)
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
index d0b5578055..144a9c26e3 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
abstract public class PriorityQueueList extends OrderedQueueEntryList
{
@@ -78,7 +79,7 @@ abstract public class PriorityQueueList extends OrderedQueueEntryList
}
@Override
- public PriorityQueueEntry add(ServerMessage message)
+ public PriorityQueueEntry add(ServerMessage message, final MessageEnqueueRecord enqueueRecord)
{
int index = message.getMessageHeader().getPriority() - _priorityOffset;
if(index >= _priorities)
@@ -89,12 +90,13 @@ abstract public class PriorityQueueList extends OrderedQueueEntryList
{
index = 0;
}
- return (PriorityQueueEntry) _priorityLists[index].add(message);
+ return (PriorityQueueEntry) _priorityLists[index].add(message, enqueueRecord);
}
@Override
- protected PriorityQueueEntry createQueueEntry(final ServerMessage<?> message)
+ protected PriorityQueueEntry createQueueEntry(final ServerMessage<?> message,
+ final MessageEnqueueRecord enqueueRecord)
{
throw new UnsupportedOperationException();
}
@@ -233,9 +235,10 @@ abstract public class PriorityQueueList extends OrderedQueueEntryList
}
@Override
- protected PriorityQueueEntry createQueueEntry(ServerMessage<?> message)
+ protected PriorityQueueEntry createQueueEntry(ServerMessage<?> message,
+ final MessageEnqueueRecord enqueueRecord)
{
- return new PriorityQueueEntry(this, message);
+ return new PriorityQueueEntry(this, message, enqueueRecord);
}
public int getListPriority()
@@ -251,9 +254,11 @@ abstract public class PriorityQueueList extends OrderedQueueEntryList
super(queueEntryList);
}
- public PriorityQueueEntry(PriorityQueueEntrySubList queueEntryList, ServerMessage<?> message)
+ public PriorityQueueEntry(PriorityQueueEntrySubList queueEntryList,
+ ServerMessage<?> message,
+ final MessageEnqueueRecord messageEnqueueRecord)
{
- super(queueEntryList, message);
+ super(queueEntryList, message, messageEnqueueRecord);
}
@Override
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
index 3ddcb98b53..a90e3d677e 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.store.MessageStore;
public interface QueueEntry extends MessageInstance, Comparable<QueueEntry>
{
@@ -35,6 +36,5 @@ public interface QueueEntry extends MessageInstance, Comparable<QueueEntry>
QueueEntry getNextValidEntry();
-
void setExpiration(long calculatedExpiration);
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index c798a1ff88..d17e209034 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -37,6 +37,7 @@ import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Exchange;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
@@ -92,15 +93,20 @@ public abstract class QueueEntryImpl implements QueueEntry
private static final AtomicIntegerFieldUpdater<QueueEntryImpl> _deliveryCountUpdater = AtomicIntegerFieldUpdater
.newUpdater(QueueEntryImpl.class, "_deliveryCount");
+ private final MessageEnqueueRecord _enqueueRecord;
+
public QueueEntryImpl(QueueEntryList queueEntryList)
{
- this(queueEntryList,null,Long.MIN_VALUE);
+ this(queueEntryList,null,Long.MIN_VALUE, null);
_state = DELETED_STATE;
}
- public QueueEntryImpl(QueueEntryList queueEntryList, ServerMessage message, final long entryId)
+ public QueueEntryImpl(QueueEntryList queueEntryList,
+ ServerMessage message,
+ final long entryId,
+ final MessageEnqueueRecord enqueueRecord)
{
_queueEntryList = queueEntryList;
@@ -108,13 +114,17 @@ public abstract class QueueEntryImpl implements QueueEntry
_entryIdUpdater.set(this, entryId);
populateInstanceProperties();
+ _enqueueRecord = enqueueRecord;
}
- public QueueEntryImpl(QueueEntryList queueEntryList, ServerMessage message)
+ public QueueEntryImpl(QueueEntryList queueEntryList,
+ ServerMessage message,
+ final MessageEnqueueRecord enqueueRecord)
{
_queueEntryList = queueEntryList;
_message = message == null ? null : message.newReference(queueEntryList.getQueue());
populateInstanceProperties();
+ _enqueueRecord = enqueueRecord;
}
private void populateInstanceProperties()
@@ -461,7 +471,7 @@ public abstract class QueueEntryImpl implements QueueEntry
enqueues = 0;
}
- txn.dequeue(currentQueue, getMessage(), new ServerTransaction.Action()
+ txn.dequeue(getEnqueueRecord(), new ServerTransaction.Action()
{
public void postCommit()
{
@@ -625,4 +635,9 @@ public abstract class QueueEntryImpl implements QueueEntry
}
+ @Override
+ public MessageEnqueueRecord getEnqueueRecord()
+ {
+ return _enqueueRecord;
+ }
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
index 938f170bee..5c7f0e3e28 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
@@ -21,12 +21,13 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
public interface QueueEntryList
{
AMQQueue<?> getQueue();
- QueueEntry add(ServerMessage message);
+ QueueEntry add(ServerMessage message, final MessageEnqueueRecord enqueueRecord);
QueueEntry next(QueueEntry node);
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntry.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntry.java
index 433005a857..f3d91af39c 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntry.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntry.java
@@ -20,6 +20,7 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
/**
* An implementation of QueueEntryImpl to be used in SortedQueueEntryList.
@@ -46,9 +47,11 @@ public class SortedQueueEntry extends QueueEntryImpl
}
public SortedQueueEntry(final SortedQueueEntryList queueEntryList,
- final ServerMessage message, final long entryId)
+ final ServerMessage message,
+ final long entryId,
+ final MessageEnqueueRecord messageEnqueueRecord)
{
- super(queueEntryList, message, entryId);
+ super(queueEntryList, message, entryId, messageEnqueueRecord);
}
@Override
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
index ee850455b0..0c8f5418ee 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.SortedQueueEntry.Colour;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
/**
* A sorted implementation of QueueEntryList.
@@ -51,7 +52,7 @@ public class SortedQueueEntryList implements QueueEntryList
return _queue;
}
- public SortedQueueEntry add(final ServerMessage message)
+ public SortedQueueEntry add(final ServerMessage message, final MessageEnqueueRecord enqueueRecord)
{
synchronized(_lock)
{
@@ -62,7 +63,7 @@ public class SortedQueueEntryList implements QueueEntryList
key = val.toString();
}
- final SortedQueueEntry entry = new SortedQueueEntry(this,message, ++_entryId);
+ final SortedQueueEntry entry = new SortedQueueEntry(this,message, ++_entryId, enqueueRecord);
entry.setKey(key);
insert(entry);
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java
index 4b6c7a3fe7..44de152776 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java
@@ -25,6 +25,7 @@ import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.ManagedAttributeField;
import org.apache.qpid.server.model.ManagedObjectFactoryConstructor;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@@ -54,11 +55,12 @@ public class SortedQueueImpl extends OutOfOrderQueue<SortedQueueImpl> implements
@Override
protected void doEnqueue(final ServerMessage message,
- final Action<? super MessageInstance> action)
+ final Action<? super MessageInstance> action,
+ MessageEnqueueRecord record)
{
synchronized (_sortedQueueLock)
{
- super.doEnqueue(message, action);
+ super.doEnqueue(message, action, record);
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntry.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntry.java
index 8669d26110..ea6f62cadb 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntry.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntry.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
public class StandardQueueEntry extends OrderedQueueEntry
{
@@ -31,14 +32,9 @@ public class StandardQueueEntry extends OrderedQueueEntry
public StandardQueueEntry(final StandardQueueEntryList queueEntryList,
final ServerMessage message,
- final long entryId)
+ final MessageEnqueueRecord messageEnqueueRecord)
{
- super(queueEntryList, message, entryId);
- }
-
- public StandardQueueEntry(final StandardQueueEntryList queueEntryList, final ServerMessage message)
- {
- super(queueEntryList, message);
+ super(queueEntryList, message, messageEnqueueRecord);
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java
index 487bac1a43..802e1d72ea 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
public class StandardQueueEntryList extends OrderedQueueEntryList
{
@@ -40,9 +41,10 @@ public class StandardQueueEntryList extends OrderedQueueEntryList
}
- protected StandardQueueEntry createQueueEntry(ServerMessage<?> message)
+ protected StandardQueueEntry createQueueEntry(ServerMessage<?> message,
+ final MessageEnqueueRecord enqueueRecord)
{
- return new StandardQueueEntry(this, message);
+ return new StandardQueueEntry(this, message, enqueueRecord);
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
index 96d2058c05..c6e6e41465 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
@@ -35,14 +35,11 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
@@ -488,11 +485,11 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
}
@Override
- public StoredMessage addMessage(StorableMessageMetaData metaData)
+ public <T extends StorableMessageMetaData> MessageHandle<T> addMessage(T metaData)
{
checkMessageStoreOpen();
- return new StoredJDBCMessage(getNextMessageId(), metaData);
+ return new StoredJDBCMessage<T>(getNextMessageId(), metaData);
}
@@ -670,18 +667,18 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
}
- private void dequeueMessage(ConnectionWrapper connWrapper, final TransactionLogResource queue, Long messageId) throws StoreException
+ private void dequeueMessage(ConnectionWrapper connWrapper, final UUID queueId,
+ Long messageId) throws StoreException
{
Connection conn = connWrapper.getConnection();
-
try
{
PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_QUEUE_ENTRY);
try
{
- stmt.setString(1, queue.getId().toString());
+ stmt.setString(1, queueId.toString());
stmt.setLong(2, messageId);
int results = stmt.executeUpdate();
@@ -689,14 +686,13 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
if(results != 1)
{
- throw new StoreException("Unable to find message with id " + messageId + " on queue " + queue.getName()
- + " with id " + queue.getId());
+ throw new StoreException("Unable to find message with id " + messageId
+ + " on queue with id " + queueId);
}
if (getLogger().isDebugEnabled())
{
- getLogger().debug("Dequeuing message " + messageId + " on queue " + queue.getName()
- + " with id " + queue.getId());
+ getLogger().debug("Dequeuing message " + messageId + " on queue with id " + queueId);
}
}
finally
@@ -708,8 +704,8 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
catch (SQLException e)
{
getLogger().error("Failed to dequeue: " + e.getMessage(), e);
- throw new StoreException("Error deleting enqueued message with id " + messageId + " for queue " + queue.getName()
- + " with id " + queue.getId() + " from database", e);
+ throw new StoreException("Error deleting enqueued message with id " + messageId + " for queue with id "
+ + queueId + " from database", e);
}
}
@@ -766,7 +762,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
}
private List<Runnable> recordXid(ConnectionWrapper connWrapper, long format, byte[] globalId, byte[] branchId,
- Transaction.Record[] enqueues, Transaction.Record[] dequeues) throws StoreException
+ Transaction.EnqueueRecord[] enqueues, Transaction.DequeueRecord[] dequeues) throws StoreException
{
Connection conn = connWrapper.getConnection();
@@ -788,7 +784,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
}
List<Runnable> postActions = new ArrayList<>();
- for(org.apache.qpid.server.store.Transaction.Record enqueue : enqueues)
+ for(Transaction.EnqueueRecord enqueue : enqueues)
{
StoredMessage storedMessage = enqueue.getMessage().getStoredMessage();
if(storedMessage instanceof StoredJDBCMessage)
@@ -809,7 +805,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
if(enqueues != null)
{
stmt.setString(4, "E");
- for(Transaction.Record record : enqueues)
+ for(Transaction.EnqueueRecord record : enqueues)
{
stmt.setString(5, record.getResource().getId().toString());
stmt.setLong(6, record.getMessage().getMessageNumber());
@@ -820,10 +816,10 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
if(dequeues != null)
{
stmt.setString(4, "D");
- for(Transaction.Record record : dequeues)
+ for(Transaction.DequeueRecord record : dequeues)
{
- stmt.setString(5, record.getResource().getId().toString());
- stmt.setLong(6, record.getMessage().getMessageNumber());
+ stmt.setString(5, record.getEnqueueRecord().getQueueId().toString());
+ stmt.setLong(6, record.getEnqueueRecord().getMessageNumber());
stmt.executeUpdate();
}
}
@@ -1059,9 +1055,10 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
}
- private static class RecordImpl implements Transaction.Record, TransactionLogResource, EnqueueableMessage
+ private static class RecordImpl implements Transaction.EnqueueRecord, Transaction.DequeueRecord, TransactionLogResource, EnqueueableMessage
{
+ private final JDBCEnqueueRecord _record;
private long _messageNumber;
private UUID _queueId;
@@ -1069,6 +1066,13 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
{
_messageNumber = messageNumber;
_queueId = queueId;
+ _record = new JDBCEnqueueRecord(queueId, messageNumber);
+ }
+
+ @Override
+ public MessageEnqueueRecord getEnqueueRecord()
+ {
+ return _record;
}
@Override
@@ -1323,7 +1327,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
}
@Override
- public void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message)
+ public MessageEnqueueRecord enqueueMessage(TransactionLogResource queue, EnqueueableMessage message)
{
checkMessageStoreOpen();
@@ -1349,15 +1353,17 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
});
}
AbstractJDBCMessageStore.this.enqueueMessage(_connWrapper, queue, message.getMessageNumber());
-
+ return new JDBCEnqueueRecord(queue.getId(), message.getMessageNumber());
}
@Override
- public void dequeueMessage(TransactionLogResource queue, EnqueueableMessage message)
+ public void dequeueMessage(final MessageEnqueueRecord enqueueRecord)
{
checkMessageStoreOpen();
- AbstractJDBCMessageStore.this.dequeueMessage(_connWrapper, queue, message.getMessageNumber());
+ AbstractJDBCMessageStore.this.dequeueMessage(_connWrapper,
+ enqueueRecord.getQueueId(),
+ enqueueRecord.getMessageNumber());
}
@Override
@@ -1408,22 +1414,93 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
}
@Override
- public void removeXid(long format, byte[] globalId, byte[] branchId)
+ public void removeXid(final StoredXidRecord record)
{
checkMessageStoreOpen();
- AbstractJDBCMessageStore.this.removeXid(_connWrapper, format, globalId, branchId);
+ AbstractJDBCMessageStore.this.removeXid(_connWrapper,
+ record.getFormat(),
+ record.getGlobalId(),
+ record.getBranchId());
}
@Override
- public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues)
+ public StoredXidRecord recordXid(final long format,
+ final byte[] globalId,
+ final byte[] branchId,
+ EnqueueRecord[] enqueues,
+ DequeueRecord[] dequeues)
{
checkMessageStoreOpen();
_postCommitActions.addAll(AbstractJDBCMessageStore.this.recordXid(_connWrapper, format, globalId, branchId, enqueues, dequeues));
+ return new JDBCStoredXidRecord(format, globalId, branchId);
}
+
+
}
+ private static class JDBCStoredXidRecord implements Transaction.StoredXidRecord
+ {
+ private final long _format;
+ private final byte[] _globalId;
+ private final byte[] _branchId;
+
+ public JDBCStoredXidRecord(final long format, final byte[] globalId, final byte[] branchId)
+ {
+ _format = format;
+ _globalId = globalId;
+ _branchId = branchId;
+ }
+
+ @Override
+ public long getFormat()
+ {
+ return _format;
+ }
+
+ @Override
+ public byte[] getGlobalId()
+ {
+ return _globalId;
+ }
+
+ @Override
+ public byte[] getBranchId()
+ {
+ return _branchId;
+ }
+
+
+ @Override
+ public boolean equals(final Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+
+ final JDBCStoredXidRecord that = (JDBCStoredXidRecord) o;
+
+ return _format == that._format
+ && Arrays.equals(_globalId, that._globalId)
+ && Arrays.equals(_branchId, that._branchId);
+
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = (int) (_format ^ (_format >>> 32));
+ result = 31 * result + Arrays.hashCode(_globalId);
+ result = 31 * result + Arrays.hashCode(_branchId);
+ return result;
+ }
+ }
static interface MessageDataRef<T extends StorableMessageMetaData>
{
@@ -1540,7 +1617,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
}
}
- private class StoredJDBCMessage<T extends StorableMessageMetaData> implements StoredMessage<T>
+ private class StoredJDBCMessage<T extends StorableMessageMetaData> implements StoredMessage<T>, MessageHandle<T>
{
private final long _messageId;
@@ -1597,7 +1674,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
}
@Override
- public void addContent(int offsetInMessage, ByteBuffer src)
+ public void addContent(ByteBuffer src)
{
src = src.slice();
byte[] data = _messageDataRef.getData();
@@ -1622,6 +1699,12 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
}
@Override
+ public StoredMessage<T> allContentAdded()
+ {
+ return this;
+ }
+
+ @Override
public int getContent(int offsetInMessage, ByteBuffer dst)
{
byte[] data = _messageDataRef.getData();
@@ -1809,286 +1892,302 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
}
@Override
- public StoredMessage<?> getMessage(long messageId) throws StoreException
+ public MessageStoreReader newMessageStoreReader()
{
- checkMessageStoreOpen();
+ return new JDBCMessageStoreReader();
+ }
- Connection conn = null;
- StoredJDBCMessage message;
- try
+ private class JDBCMessageStoreReader implements MessageStoreReader
+ {
+
+ @Override
+ public StoredMessage<?> getMessage(long messageId) throws StoreException
{
- conn = newAutoCommitConnection();
- try (PreparedStatement stmt = conn.prepareStatement(SELECT_ONE_FROM_META_DATA))
+ checkMessageStoreOpen();
+
+ Connection conn = null;
+ StoredJDBCMessage message;
+ try
{
- stmt.setLong(1, messageId);
- try (ResultSet rs = stmt.executeQuery())
+ conn = newAutoCommitConnection();
+ try (PreparedStatement stmt = conn.prepareStatement(SELECT_ONE_FROM_META_DATA))
{
- if (rs.next())
+ stmt.setLong(1, messageId);
+ try (ResultSet rs = stmt.executeQuery())
{
- byte[] dataAsBytes = getBlobAsBytes(rs, 2);
- ByteBuffer buf = ByteBuffer.wrap(dataAsBytes);
- buf.position(1);
- buf = buf.slice();
- MessageMetaDataType<?> type = MessageMetaDataTypeRegistry.fromOrdinal(dataAsBytes[0]);
- StorableMessageMetaData metaData = type.createMetaData(buf);
- message = new StoredJDBCMessage(messageId, metaData, true);
+ if (rs.next())
+ {
+ byte[] dataAsBytes = getBlobAsBytes(rs, 2);
+ ByteBuffer buf = ByteBuffer.wrap(dataAsBytes);
+ buf.position(1);
+ buf = buf.slice();
+ MessageMetaDataType<?> type = MessageMetaDataTypeRegistry.fromOrdinal(dataAsBytes[0]);
+ StorableMessageMetaData metaData = type.createMetaData(buf);
+ message = new StoredJDBCMessage(messageId, metaData, true);
- }
- else
- {
- message = null;
+ }
+ else
+ {
+ message = null;
+ }
}
}
+ return message;
+ }
+ catch (SQLException e)
+ {
+ throw new StoreException("Error encountered when visiting messages", e);
+ }
+ finally
+ {
+ JdbcUtils.closeConnection(conn, getLogger());
}
- return message;
- }
- catch (SQLException e)
- {
- throw new StoreException("Error encountered when visiting messages", e);
}
- finally
+
+ @Override
+ public void close()
{
- JdbcUtils.closeConnection(conn, getLogger());
- }
- }
+ }
- @Override
- public void visitMessages(MessageHandler handler) throws StoreException
- {
- checkMessageStoreOpen();
- Connection conn = null;
- try
+ @Override
+ public void visitMessages(MessageHandler handler) throws StoreException
{
- conn = newAutoCommitConnection();
- Statement stmt = conn.createStatement();
+ checkMessageStoreOpen();
+
+ Connection conn = null;
try
{
- ResultSet rs = stmt.executeQuery(SELECT_ALL_FROM_META_DATA);
+ conn = newAutoCommitConnection();
+ Statement stmt = conn.createStatement();
try
{
- while (rs.next())
+ ResultSet rs = stmt.executeQuery(SELECT_ALL_FROM_META_DATA);
+ try
{
- long messageId = rs.getLong(1);
- byte[] dataAsBytes = getBlobAsBytes(rs, 2);
- ByteBuffer buf = ByteBuffer.wrap(dataAsBytes);
- buf.position(1);
- buf = buf.slice();
- MessageMetaDataType<?> type = MessageMetaDataTypeRegistry.fromOrdinal(dataAsBytes[0]);
- StorableMessageMetaData metaData = type.createMetaData(buf);
- StoredJDBCMessage message = new StoredJDBCMessage(messageId, metaData, true);
- if (!handler.handle(message))
+ while (rs.next())
{
- break;
+ long messageId = rs.getLong(1);
+ byte[] dataAsBytes = getBlobAsBytes(rs, 2);
+ ByteBuffer buf = ByteBuffer.wrap(dataAsBytes);
+ buf.position(1);
+ buf = buf.slice();
+ MessageMetaDataType<?> type = MessageMetaDataTypeRegistry.fromOrdinal(dataAsBytes[0]);
+ StorableMessageMetaData metaData = type.createMetaData(buf);
+ StoredJDBCMessage message = new StoredJDBCMessage(messageId, metaData, true);
+ if (!handler.handle(message))
+ {
+ break;
+ }
}
}
+ finally
+ {
+ rs.close();
+ }
}
finally
{
- rs.close();
+ stmt.close();
}
}
+ catch (SQLException e)
+ {
+ throw new StoreException("Error encountered when visiting messages", e);
+ }
finally
{
- stmt.close();
+ JdbcUtils.closeConnection(conn, getLogger());
}
}
- catch (SQLException e)
- {
- throw new StoreException("Error encountered when visiting messages", e);
- }
- finally
- {
- JdbcUtils.closeConnection(conn, getLogger());
- }
- }
- @Override
- public void visitMessageInstances(TransactionLogResource queue, MessageInstanceHandler handler) throws StoreException
- {
- checkMessageStoreOpen();
-
- Connection conn = null;
- try
+ @Override
+ public void visitMessageInstances(TransactionLogResource queue, MessageInstanceHandler handler)
+ throws StoreException
{
- conn = newAutoCommitConnection();
- PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_QUEUE_ENTRY_FOR_QUEUE);
+ checkMessageStoreOpen();
+
+ Connection conn = null;
try
{
- stmt.setString(1, queue.getId().toString());
- ResultSet rs = stmt.executeQuery();
+ conn = newAutoCommitConnection();
+ PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_QUEUE_ENTRY_FOR_QUEUE);
try
{
- while(rs.next())
+ stmt.setString(1, queue.getId().toString());
+ ResultSet rs = stmt.executeQuery();
+ try
{
- String id = rs.getString(1);
- long messageId = rs.getLong(2);
- if (!handler.handle(UUID.fromString(id), messageId))
+ while (rs.next())
{
- break;
+ String id = rs.getString(1);
+ long messageId = rs.getLong(2);
+ if (!handler.handle(new JDBCEnqueueRecord(UUID.fromString(id), messageId)))
+ {
+ break;
+ }
}
}
+ finally
+ {
+ rs.close();
+ }
}
finally
{
- rs.close();
+ stmt.close();
}
}
+ catch (SQLException e)
+ {
+ throw new StoreException("Error encountered when visiting message instances", e);
+ }
finally
{
- stmt.close();
+ JdbcUtils.closeConnection(conn, getLogger());
}
- }
- catch(SQLException e)
- {
- throw new StoreException("Error encountered when visiting message instances", e);
- }
- finally
- {
- JdbcUtils.closeConnection(conn, getLogger());
- }
-
- }
- @Override
- public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException
- {
- checkMessageStoreOpen();
+ }
- Connection conn = null;
- try
+ @Override
+ public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException
{
- conn = newAutoCommitConnection();
- Statement stmt = conn.createStatement();
+ checkMessageStoreOpen();
+
+ Connection conn = null;
try
{
- ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE_ENTRY);
+ conn = newAutoCommitConnection();
+ Statement stmt = conn.createStatement();
try
{
- while(rs.next())
+ ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE_ENTRY);
+ try
{
- String id = rs.getString(1);
- long messageId = rs.getLong(2);
- if (!handler.handle(UUID.fromString(id), messageId))
+ while (rs.next())
{
- break;
+ String id = rs.getString(1);
+ long messageId = rs.getLong(2);
+ if (!handler.handle(new JDBCEnqueueRecord(UUID.fromString(id), messageId)))
+ {
+ break;
+ }
}
}
+ finally
+ {
+ rs.close();
+ }
}
finally
{
- rs.close();
+ stmt.close();
}
}
+ catch (SQLException e)
+ {
+ throw new StoreException("Error encountered when visiting message instances", e);
+ }
finally
{
- stmt.close();
+ JdbcUtils.closeConnection(conn, getLogger());
}
}
- catch(SQLException e)
- {
- throw new StoreException("Error encountered when visiting message instances", e);
- }
- finally
- {
- JdbcUtils.closeConnection(conn, getLogger());
- }
- }
-
- @Override
- public void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException
- {
- checkMessageStoreOpen();
- Connection conn = null;
- try
+ @Override
+ public void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException
{
- conn = newAutoCommitConnection();
- List<Xid> xids = new ArrayList<Xid>();
+ checkMessageStoreOpen();
- Statement stmt = conn.createStatement();
+ Connection conn = null;
try
{
- ResultSet rs = stmt.executeQuery(SELECT_ALL_FROM_XIDS);
+ conn = newAutoCommitConnection();
+ List<Xid> xids = new ArrayList<Xid>();
+
+ Statement stmt = conn.createStatement();
try
{
- while(rs.next())
+ ResultSet rs = stmt.executeQuery(SELECT_ALL_FROM_XIDS);
+ try
{
+ while (rs.next())
+ {
- long format = rs.getLong(1);
- byte[] globalId = rs.getBytes(2);
- byte[] branchId = rs.getBytes(3);
- xids.add(new Xid(format, globalId, branchId));
+ long format = rs.getLong(1);
+ byte[] globalId = rs.getBytes(2);
+ byte[] branchId = rs.getBytes(3);
+ xids.add(new Xid(format, globalId, branchId));
+ }
+ }
+ finally
+ {
+ rs.close();
}
}
finally
{
- rs.close();
+ stmt.close();
}
- }
- finally
- {
- stmt.close();
- }
-
- for(Xid xid : xids)
- {
- List<RecordImpl> enqueues = new ArrayList<RecordImpl>();
- List<RecordImpl> dequeues = new ArrayList<RecordImpl>();
-
- PreparedStatement pstmt = conn.prepareStatement(SELECT_ALL_FROM_XID_ACTIONS);
-
- try
+ for (Xid xid : xids)
{
- pstmt.setLong(1, xid.getFormat());
- pstmt.setBytes(2, xid.getGlobalId());
- pstmt.setBytes(3, xid.getBranchId());
+ List<RecordImpl> enqueues = new ArrayList<RecordImpl>();
+ List<RecordImpl> dequeues = new ArrayList<RecordImpl>();
+
+ PreparedStatement pstmt = conn.prepareStatement(SELECT_ALL_FROM_XID_ACTIONS);
- ResultSet rs = pstmt.executeQuery();
try
{
- while(rs.next())
+ pstmt.setLong(1, xid.getFormat());
+ pstmt.setBytes(2, xid.getGlobalId());
+ pstmt.setBytes(3, xid.getBranchId());
+
+ ResultSet rs = pstmt.executeQuery();
+ try
{
+ while (rs.next())
+ {
- String actionType = rs.getString(1);
- UUID queueId = UUID.fromString(rs.getString(2));
- long messageId = rs.getLong(3);
+ String actionType = rs.getString(1);
+ UUID queueId = UUID.fromString(rs.getString(2));
+ long messageId = rs.getLong(3);
- RecordImpl record = new RecordImpl(queueId, messageId);
- List<RecordImpl> records = "E".equals(actionType) ? enqueues : dequeues;
- records.add(record);
+ RecordImpl record = new RecordImpl(queueId, messageId);
+ List<RecordImpl> records = "E".equals(actionType) ? enqueues : dequeues;
+ records.add(record);
+ }
+ }
+ finally
+ {
+ rs.close();
}
}
finally
{
- rs.close();
+ pstmt.close();
}
- }
- finally
- {
- pstmt.close();
- }
- if (!handler.handle(xid.getFormat(), xid.getGlobalId(), xid.getBranchId(),
- enqueues.toArray(new RecordImpl[enqueues.size()]),
- dequeues.toArray(new RecordImpl[dequeues.size()])))
- {
- break;
+ if (!handler.handle(new JDBCStoredXidRecord(xid.getFormat(), xid.getGlobalId(), xid.getBranchId()),
+ enqueues.toArray(new RecordImpl[enqueues.size()]),
+ dequeues.toArray(new RecordImpl[dequeues.size()])))
+ {
+ break;
+ }
}
- }
- }
- catch (SQLException e)
- {
- throw new StoreException("Error encountered when visiting distributed transactions", e);
+ }
+ catch (SQLException e)
+ {
+ throw new StoreException("Error encountered when visiting distributed transactions", e);
- }
- finally
- {
- JdbcUtils.closeConnection(conn, getLogger());
+ }
+ finally
+ {
+ JdbcUtils.closeConnection(conn, getLogger());
+ }
}
}
@@ -2135,4 +2234,26 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
}
+ private static class JDBCEnqueueRecord implements MessageEnqueueRecord
+ {
+ private final UUID _queueId;
+ private final long _messageNumber;
+
+ public JDBCEnqueueRecord(final UUID queueId,
+ final long messageNumber)
+ {
+ _queueId = queueId;
+ _messageNumber = messageNumber;
+ }
+
+ public UUID getQueueId()
+ {
+ return _queueId;
+ }
+
+ public long getMessageNumber()
+ {
+ return _messageNumber;
+ }
+ }
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
index eb887b4ef5..650602e23c 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.store;
import java.io.File;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -65,7 +66,7 @@ public class MemoryMessageStore implements MessageStore
}
@Override
- public void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message)
+ public MessageEnqueueRecord enqueueMessage(TransactionLogResource queue, EnqueueableMessage message)
{
if(message.getStoredMessage() instanceof StoredMemoryMessage)
@@ -80,18 +81,24 @@ public class MemoryMessageStore implements MessageStore
_localEnqueueMap.put(queue.getId(), messageIds);
}
messageIds.add(message.getMessageNumber());
+ return new MemoryEnqueueRecord(queue.getId(), message.getMessageNumber());
}
@Override
- public void dequeueMessage(TransactionLogResource queue, EnqueueableMessage message)
+ public void dequeueMessage(final MessageEnqueueRecord enqueueRecord)
{
- Set<Long> messageIds = _localDequeueMap.get(queue.getId());
+ dequeueMessage(enqueueRecord.getQueueId(), enqueueRecord.getMessageNumber());
+ }
+
+ private void dequeueMessage(final UUID queueId, final long messageNumber)
+ {
+ Set<Long> messageIds = _localDequeueMap.get(queueId);
if (messageIds == null)
{
messageIds = new HashSet<Long>();
- _localDequeueMap.put(queue.getId(), messageIds);
+ _localDequeueMap.put(queueId, messageIds);
}
- messageIds.add(message.getMessageNumber());
+ messageIds.add(messageNumber);
}
@Override
@@ -110,36 +117,106 @@ public class MemoryMessageStore implements MessageStore
}
@Override
- public void removeXid(long format, byte[] globalId, byte[] branchId)
+ public void removeXid(final StoredXidRecord record)
{
- _localDistributedTransactionsRemoves.add(new Xid(format, globalId, branchId));
+ _localDistributedTransactionsRemoves.add(new Xid(record.getFormat(),
+ record.getGlobalId(),
+ record.getBranchId()));
}
@Override
- public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues)
+ public StoredXidRecord recordXid(final long format,
+ final byte[] globalId,
+ final byte[] branchId,
+ EnqueueRecord[] enqueues,
+ DequeueRecord[] dequeues)
{
_localDistributedTransactionsRecords.put(new Xid(format, globalId, branchId), new DistributedTransactionRecords(enqueues, dequeues));
+ return new MemoryStoredXidRecord(format, globalId, branchId);
}
+
+
}
+ private static class MemoryStoredXidRecord implements Transaction.StoredXidRecord
+ {
+ private final long _format;
+ private final byte[] _globalId;
+ private final byte[] _branchId;
+
+ public MemoryStoredXidRecord(final long format, final byte[] globalId, final byte[] branchId)
+ {
+ _format = format;
+ _globalId = globalId;
+ _branchId = branchId;
+ }
+
+ @Override
+ public long getFormat()
+ {
+ return _format;
+ }
+
+ @Override
+ public byte[] getGlobalId()
+ {
+ return _globalId;
+ }
+
+ @Override
+ public byte[] getBranchId()
+ {
+ return _branchId;
+ }
+
+
+ @Override
+ public boolean equals(final Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+
+ final MemoryStoredXidRecord that = (MemoryStoredXidRecord) o;
+
+ return _format == that._format
+ && Arrays.equals(_globalId, that._globalId)
+ && Arrays.equals(_branchId, that._branchId);
+
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = (int) (_format ^ (_format >>> 32));
+ result = 31 * result + Arrays.hashCode(_globalId);
+ result = 31 * result + Arrays.hashCode(_branchId);
+ return result;
+ }
+ }
private static final class DistributedTransactionRecords
{
- private Transaction.Record[] _enqueues;
- private Transaction.Record[] _dequeues;
+ private Transaction.EnqueueRecord[] _enqueues;
+ private Transaction.DequeueRecord[] _dequeues;
- public DistributedTransactionRecords(Transaction.Record[] enqueues, Transaction.Record[] dequeues)
+ public DistributedTransactionRecords(Transaction.EnqueueRecord[] enqueues, Transaction.DequeueRecord[] dequeues)
{
super();
_enqueues = enqueues;
_dequeues = dequeues;
}
- public Transaction.Record[] getEnqueues()
+ public Transaction.EnqueueRecord[] getEnqueues()
{
return _enqueues;
}
- public Transaction.Record[] getDequeues()
+ public Transaction.DequeueRecord[] getDequeues()
{
return _dequeues;
}
@@ -201,7 +278,7 @@ public class MemoryMessageStore implements MessageStore
}
@Override
- public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(final T metaData)
+ public <T extends StorableMessageMetaData> MessageHandle<T> addMessage(final T metaData)
{
long id = getNextMessageId();
@@ -273,81 +350,121 @@ public class MemoryMessageStore implements MessageStore
}
@Override
- public void visitMessages(final MessageHandler handler) throws StoreException
+ public MessageStoreReader newMessageStoreReader()
{
- for (StoredMemoryMessage message : _messages.values())
- {
- if(!handler.handle(message))
- {
- break;
- }
- }
+ return new MemoryMessageStoreReader();
}
- @Override
- public StoredMessage<?> getMessage(final long messageId)
+
+ private static class MemoryEnqueueRecord implements MessageEnqueueRecord
{
- return _messages.get(messageId);
+ private final UUID _queueId;
+ private final long _messageNumber;
+
+ public MemoryEnqueueRecord(final UUID queueId,
+ final long messageNumber)
+ {
+ _queueId = queueId;
+ _messageNumber = messageNumber;
+ }
+
+ public UUID getQueueId()
+ {
+ return _queueId;
+ }
+
+ public long getMessageNumber()
+ {
+ return _messageNumber;
+ }
}
- @Override
- public void visitMessageInstances(final MessageInstanceHandler handler) throws StoreException
+ private class MemoryMessageStoreReader implements MessageStoreReader
{
- synchronized (_transactionLock)
+ @Override
+ public StoredMessage<?> getMessage(final long messageId)
+ {
+ return _messages.get(messageId);
+ }
+
+ @Override
+ public void close()
{
- for (Map.Entry<UUID, Set<Long>> enqueuedEntry : _messageInstances.entrySet())
+
+ }
+
+ @Override
+ public void visitMessageInstances(final MessageInstanceHandler handler) throws StoreException
+ {
+ synchronized (_transactionLock)
{
- UUID resourceId = enqueuedEntry.getKey();
- for (Long messageId : enqueuedEntry.getValue())
+ for (Map.Entry<UUID, Set<Long>> enqueuedEntry : _messageInstances.entrySet())
{
- if (!handler.handle(resourceId, messageId))
+ UUID resourceId = enqueuedEntry.getKey();
+ for (Long messageId : enqueuedEntry.getValue())
{
- return;
+ if (!handler.handle(new MemoryEnqueueRecord(resourceId, messageId)))
+ {
+ return;
+ }
}
}
}
}
- }
- @Override
- public void visitMessageInstances(TransactionLogResource queue, final MessageInstanceHandler handler) throws StoreException
- {
- synchronized (_transactionLock)
+ @Override
+ public void visitMessageInstances(TransactionLogResource queue, final MessageInstanceHandler handler) throws StoreException
{
- Set<Long> ids = _messageInstances.get(queue.getId());
- if(ids != null)
+ synchronized (_transactionLock)
{
- for (long id : ids)
+ Set<Long> ids = _messageInstances.get(queue.getId());
+ if(ids != null)
{
- if (!handler.handle(queue.getId(), id))
+ for (long id : ids)
{
- return;
- }
+ if (!handler.handle(new MemoryEnqueueRecord(queue.getId(), id)))
+ {
+ return;
+ }
+ }
}
}
}
- }
- @Override
- public void visitDistributedTransactions(final DistributedTransactionHandler handler) throws StoreException
- {
- synchronized (_transactionLock)
+ @Override
+ public void visitMessages(final MessageHandler handler) throws StoreException
{
- for (Map.Entry<Xid, DistributedTransactionRecords> entry : _distributedTransactions.entrySet())
+ for (StoredMemoryMessage message : _messages.values())
{
- Xid xid = entry.getKey();
- DistributedTransactionRecords records = entry.getValue();
- if (!handler.handle(xid.getFormat(),
- xid.getGlobalId(),
- xid.getBranchId(),
- records.getEnqueues(),
- records.getDequeues()))
+ if(!handler.handle(message))
{
break;
}
}
}
+
+ @Override
+ public void visitDistributedTransactions(final DistributedTransactionHandler handler) throws StoreException
+ {
+ synchronized (_transactionLock)
+ {
+ for (Map.Entry<Xid, DistributedTransactionRecords> entry : _distributedTransactions.entrySet())
+ {
+ Xid xid = entry.getKey();
+ DistributedTransactionRecords records = entry.getValue();
+ if (!handler.handle(new MemoryStoredXidRecord(xid.getFormat(),
+ xid.getGlobalId(),
+ xid.getBranchId()),
+ records.getEnqueues(),
+ records.getDequeues()))
+ {
+ break;
+ }
+ }
+ }
+ }
+
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageEnqueueRecord.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageEnqueueRecord.java
new file mode 100644
index 0000000000..e48c50b13a
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageEnqueueRecord.java
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import java.util.UUID;
+
+public interface MessageEnqueueRecord
+{
+ UUID getQueueId();
+ long getMessageNumber();
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageHandle.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageHandle.java
new file mode 100644
index 0000000000..151a819dee
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageHandle.java
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import java.nio.ByteBuffer;
+
+public interface MessageHandle<M extends StorableMessageMetaData>
+{
+
+ void addContent(ByteBuffer src);
+
+ StoredMessage<M> allContentAdded();
+
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java
index 28a330b205..8439f85712 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.store;
import java.io.File;
+import java.util.UUID;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
@@ -59,15 +60,7 @@ public interface MessageStore
*/
void upgradeStoreStructure() throws StoreException;
- void visitMessages(MessageHandler handler) throws StoreException;
-
- void visitMessageInstances(MessageInstanceHandler handler) throws StoreException;
- void visitMessageInstances(TransactionLogResource queue, MessageInstanceHandler handler) throws StoreException;
-
- void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException;
-
- <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData);
- StoredMessage<?> getMessage(long messageId);
+ <T extends StorableMessageMetaData> MessageHandle<T> addMessage(T metaData);
/**
* Is this store capable of persisting the data
@@ -84,4 +77,20 @@ public interface MessageStore
void closeMessageStore();
void onDelete(ConfiguredObject<?> parent);
+
+ MessageStoreReader newMessageStoreReader();
+
+ interface MessageStoreReader
+ {
+ void visitMessages(MessageHandler handler) throws StoreException;
+
+ void visitMessageInstances(MessageInstanceHandler handler) throws StoreException;
+ void visitMessageInstances(TransactionLogResource queue, MessageInstanceHandler handler) throws StoreException;
+
+ void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException;
+
+ StoredMessage<?> getMessage(long messageId);
+ void close();
+ }
+
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
index 83d61fc3fe..b8ca8b0834 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
@@ -28,7 +28,7 @@ import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
import org.apache.qpid.server.store.handler.MessageHandler;
import org.apache.qpid.server.store.handler.MessageInstanceHandler;
-public abstract class NullMessageStore implements MessageStore, DurableConfigurationStore, MessageStoreProvider
+public abstract class NullMessageStore implements MessageStore, DurableConfigurationStore, MessageStoreProvider, MessageStore.MessageStoreReader
{
private ConfiguredObjectRecord[] _initialRecords;
@@ -89,7 +89,7 @@ public abstract class NullMessageStore implements MessageStore, DurableConfigura
}
@Override
- public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData)
+ public <T extends StorableMessageMetaData> MessageHandle<T> addMessage(T metaData)
{
return null;
}
@@ -145,6 +145,7 @@ public abstract class NullMessageStore implements MessageStore, DurableConfigura
handler.end();
}
+
@Override
public void visitMessages(MessageHandler handler) throws StoreException
{
@@ -176,4 +177,16 @@ public abstract class NullMessageStore implements MessageStore, DurableConfigura
{
return null;
}
+
+ @Override
+ public MessageStoreReader newMessageStoreReader()
+ {
+ return this;
+ }
+
+ @Override
+ public void close()
+ {
+
+ }
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
index e8402c9268..698cf5b9e6 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
@@ -23,7 +23,9 @@ package org.apache.qpid.server.store;
import java.nio.ByteBuffer;
-public class StoredMemoryMessage<T extends StorableMessageMetaData> implements StoredMessage<T>
+import org.apache.qpid.transport.util.Functions;
+
+public class StoredMemoryMessage<T extends StorableMessageMetaData> implements StoredMessage<T>, MessageHandle<T>
{
private final long _messageNumber;
private ByteBuffer _content;
@@ -40,48 +42,45 @@ public class StoredMemoryMessage<T extends StorableMessageMetaData> implements S
return _messageNumber;
}
- public void addContent(int offsetInMessage, ByteBuffer src)
+ public void addContent(ByteBuffer src)
{
if(_content == null)
{
- if(offsetInMessage == 0)
- {
- _content = src.slice();
- }
- else
- {
- final int contentSize = _metaData.getContentSize();
- int size = (contentSize < offsetInMessage + src.remaining())
- ? offsetInMessage + src.remaining()
- : contentSize;
- _content = ByteBuffer.allocate(size);
- addContent(offsetInMessage, src);
- }
+ _content = src.slice();
+ _content.position(_content.limit());
}
else
{
- if(_content.limit() >= offsetInMessage + src.remaining())
+ if(_content.remaining() >= src.remaining())
{
- _content.position(offsetInMessage);
- _content.put(src);
- _content.position(0);
+ _content.put(src.duplicate());
}
else
{
final int contentSize = _metaData.getContentSize();
- int size = (contentSize < offsetInMessage + src.remaining())
- ? offsetInMessage + src.remaining()
+ int size = (contentSize < _content.position() + src.remaining())
+ ? _content.position() + src.remaining()
: contentSize;
ByteBuffer oldContent = _content;
+ oldContent.flip();
_content = ByteBuffer.allocate(size);
_content.put(oldContent);
- _content.position(0);
- addContent(offsetInMessage, src);
+ _content.put(src.duplicate());
}
}
}
+ @Override
+ public StoredMessage<T> allContentAdded()
+ {
+ if(_content != null)
+ {
+ _content.flip();
+ }
+ return this;
+ }
+
public int getContent(int offset, ByteBuffer dst)
{
if(_content == null)
@@ -142,4 +141,5 @@ public class StoredMemoryMessage<T extends StorableMessageMetaData> implements S
{
return false;
}
+
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java
index 7561b4a11c..e71061ba9d 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java
@@ -28,8 +28,6 @@ public interface StoredMessage<M extends StorableMessageMetaData>
public long getMessageNumber();
- void addContent(int offsetInMessage, ByteBuffer src);
-
int getContent(int offsetInMessage, ByteBuffer dst);
ByteBuffer getContent(int offsetInMessage, int size);
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java
index 007f3ab796..4ea751a663 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java
@@ -25,24 +25,17 @@ import org.apache.qpid.server.util.FutureResult;
public interface Transaction
{
+
/**
* Places a message onto a specified queue, in a given transactional context.
*
*
- *
- * @param queue The queue to place the message on.
+ * @param queue The queue to place the message on.
* @param message
*/
- void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message);
-
- /**
- * Extracts a message from a specified queue, in a given transactional context.
- *
- * @param queue The queue to place the message on.
- * @param message The message to dequeue.
- */
- void dequeueMessage(TransactionLogResource queue, EnqueueableMessage message);
+ MessageEnqueueRecord enqueueMessage(TransactionLogResource queue, EnqueueableMessage message);
+ void dequeueMessage(MessageEnqueueRecord enqueueRecord);
/**
* Commits all operations performed within a given transactional context.
@@ -63,14 +56,28 @@ public interface Transaction
void abortTran();
- public static interface Record
+ interface EnqueueRecord
{
TransactionLogResource getResource();
EnqueueableMessage getMessage();
}
- void removeXid(long format, byte[] globalId, byte[] branchId);
+ interface DequeueRecord
+ {
+ MessageEnqueueRecord getEnqueueRecord();
+ }
+
+ void removeXid(StoredXidRecord record);
+
+
+ StoredXidRecord recordXid(long format, byte[] globalId, byte[] branchId, EnqueueRecord[] enqueues,
+ DequeueRecord[] dequeues);
- void recordXid(long format, byte[] globalId, byte[] branchId, Transaction.Record[] enqueues,
- Transaction.Record[] dequeues);
+ interface StoredXidRecord
+ {
+ long getFormat();
+ byte[] getGlobalId();
+ byte[] getBranchId();
+
+ }
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/DistributedTransactionHandler.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/DistributedTransactionHandler.java
index 733c93355b..96b2702a20 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/DistributedTransactionHandler.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/DistributedTransactionHandler.java
@@ -20,11 +20,13 @@
*/
package org.apache.qpid.server.store.handler;
-import org.apache.qpid.server.store.Transaction.Record;
+import org.apache.qpid.server.store.Transaction;
+import org.apache.qpid.server.store.Transaction.DequeueRecord;
+import org.apache.qpid.server.store.Transaction.EnqueueRecord;
public interface DistributedTransactionHandler
{
- boolean handle(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues);
+ boolean handle(Transaction.StoredXidRecord storedXid, EnqueueRecord[] enqueues, DequeueRecord[] dequeues);
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/MessageInstanceHandler.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/MessageInstanceHandler.java
index 3775ec4fee..ea88468d3e 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/MessageInstanceHandler.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/MessageInstanceHandler.java
@@ -20,10 +20,11 @@
*/
package org.apache.qpid.server.store.handler;
-import java.util.UUID;
+
+import org.apache.qpid.server.store.MessageEnqueueRecord;
public interface MessageInstanceHandler
{
- boolean handle(UUID queueId, long messageId);
+ boolean handle(MessageEnqueueRecord record);
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
index 97e544d566..8b36ebce1b 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
@@ -28,8 +28,8 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.message.MessageInstance;
-import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.util.FutureResult;
import org.apache.qpid.server.store.Transaction;
@@ -88,21 +88,21 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
}
- public void dequeue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction)
+ public void dequeue(MessageEnqueueRecord record, Action postTransactionAction)
{
Transaction txn = null;
try
{
FutureResult future;
- if(queue.getMessageDurability().persist(message.isPersistent()))
+ if(record != null)
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Dequeue of message number " + message.getMessageNumber() + " from transaction log. Queue : " + queue.getName());
+ _logger.debug("Dequeue of message number " + record.getMessageNumber() + " from transaction log. Queue : " + record.getQueueId());
}
txn = _messageStore.newTransaction();
- txn.dequeueMessage(queue, message);
+ txn.dequeueMessage(record);
future = txn.commitTranAsync();
txn = null;
@@ -121,6 +121,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
}
+
private void addFuture(final FutureResult future, final Action action)
{
if(action != null)
@@ -160,14 +161,13 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
{
for(MessageInstance entry : queueEntries)
{
- ServerMessage message = entry.getMessage();
- TransactionLogResource queue = entry.getOwningResource();
+ MessageEnqueueRecord record = entry.getEnqueueRecord();
- if(queue.getMessageDurability().persist(message.isPersistent()))
+ if(record != null)
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Dequeue of message number " + message.getMessageNumber() + " from transaction log. Queue : " + queue.getName());
+ _logger.debug("Dequeue of message number " + record.getMessageNumber() + " from transaction log. Queue : " + record.getQueueId());
}
if(txn == null)
@@ -175,7 +175,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
txn = _messageStore.newTransaction();
}
- txn.dequeueMessage(queue, message);
+ txn.dequeueMessage(record);
}
}
@@ -200,12 +200,13 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
}
- public void enqueue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction)
+ public void enqueue(TransactionLogResource queue, EnqueueableMessage message, EnqueueAction postTransactionAction)
{
Transaction txn = null;
try
{
FutureResult future;
+ final MessageEnqueueRecord enqueueRecord;
if(queue.getMessageDurability().persist(message.isPersistent()))
{
if (_logger.isDebugEnabled())
@@ -214,30 +215,65 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
}
txn = _messageStore.newTransaction();
- txn.enqueueMessage(queue, message);
+ enqueueRecord = txn.enqueueMessage(queue, message);
future = txn.commitTranAsync();
txn = null;
}
else
{
future = FutureResult.IMMEDIATE_FUTURE;
+ enqueueRecord = null;
}
- addEnqueueFuture(future, postTransactionAction, message.isPersistent());
+ final EnqueueAction underlying = postTransactionAction;
+ addEnqueueFuture(future, new Action()
+ {
+ @Override
+ public void postCommit()
+ {
+ underlying.postCommit(enqueueRecord);
+ }
+
+ @Override
+ public void onRollback()
+ {
+ underlying.postCommit(enqueueRecord);
+ }
+ }, message.isPersistent());
postTransactionAction = null;
- }finally
+ }
+ finally
{
- rollbackIfNecessary(postTransactionAction, txn);
+ final EnqueueAction underlying = postTransactionAction;
+
+ rollbackIfNecessary(new Action()
+ {
+ @Override
+ public void postCommit()
+ {
+
+ }
+
+ @Override
+ public void onRollback()
+ {
+ if(underlying != null)
+ {
+ underlying.onRollback();
+ }
+ }
+ }, txn);
}
}
- public void enqueue(List<? extends BaseQueue> queues, EnqueueableMessage message, Action postTransactionAction)
+ public void enqueue(List<? extends BaseQueue> queues, EnqueueableMessage message, EnqueueAction postTransactionAction)
{
Transaction txn = null;
try
{
-
+ final MessageEnqueueRecord[] records = new MessageEnqueueRecord[queues.size()];
+ int i = 0;
for(BaseQueue queue : queues)
{
if (queue.getMessageDurability().persist(message.isPersistent()))
@@ -250,10 +286,11 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
{
txn = _messageStore.newTransaction();
}
- txn.enqueueMessage(queue, message);
+ records[i] = txn.enqueueMessage(queue, message);
}
+ i++;
}
FutureResult future;
@@ -266,13 +303,49 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
{
future = FutureResult.IMMEDIATE_FUTURE;
}
- addEnqueueFuture(future, postTransactionAction, message.isPersistent());
+ final EnqueueAction underlying = postTransactionAction;
+ addEnqueueFuture(future, new Action()
+ {
+ @Override
+ public void postCommit()
+ {
+ if(underlying != null)
+ {
+ underlying.postCommit(records);
+ }
+ }
+
+ @Override
+ public void onRollback()
+ {
+ underlying.onRollback();
+ }
+ }, message.isPersistent());
postTransactionAction = null;
- }finally
+ }
+ finally
{
- rollbackIfNecessary(postTransactionAction, txn);
+ final EnqueueAction underlying = postTransactionAction;
+
+ rollbackIfNecessary(new Action()
+ {
+ @Override
+ public void postCommit()
+ {
+
+ }
+
+ @Override
+ public void onRollback()
+ {
+ if(underlying != null)
+ {
+ underlying.onRollback();
+ }
+ }
+ }, txn);
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
index 81db5b0358..9c06c1f273 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
@@ -28,8 +28,8 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.message.MessageInstance;
-import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.Transaction;
import org.apache.qpid.server.store.TransactionLogResource;
@@ -73,20 +73,20 @@ public class AutoCommitTransaction implements ServerTransaction
immediateAction.postCommit();
}
- public void dequeue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction)
+ public void dequeue(MessageEnqueueRecord record, Action postTransactionAction)
{
Transaction txn = null;
try
{
- if(queue.getMessageDurability().persist(message.isPersistent()))
+ if(record != null)
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Dequeue of message number " + message.getMessageNumber() + " from transaction log. Queue : " + queue.getName());
+ _logger.debug("Dequeue of message number " + record.getMessageNumber() + " from transaction log. Queue : " + record.getQueueId());
}
txn = _messageStore.newTransaction();
- txn.dequeueMessage(queue, message);
+ txn.dequeueMessage(record);
txn.commitTran();
txn = null;
}
@@ -100,6 +100,7 @@ public class AutoCommitTransaction implements ServerTransaction
}
+
public void dequeue(Collection<MessageInstance> queueEntries, Action postTransactionAction)
{
Transaction txn = null;
@@ -107,14 +108,12 @@ public class AutoCommitTransaction implements ServerTransaction
{
for(MessageInstance entry : queueEntries)
{
- ServerMessage message = entry.getMessage();
- TransactionLogResource queue = entry.getOwningResource();
-
- if(queue.getMessageDurability().persist(message.isPersistent()))
+ MessageEnqueueRecord enqueueRecord = entry.getEnqueueRecord();
+ if(enqueueRecord != null)
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Dequeue of message number " + message.getMessageNumber() + " from transaction log. Queue : " + queue.getName());
+ _logger.debug("Dequeue of message number " + enqueueRecord.getMessageNumber() + " from transaction log. Queue : " + enqueueRecord.getQueueId());
}
if(txn == null)
@@ -122,7 +121,7 @@ public class AutoCommitTransaction implements ServerTransaction
txn = _messageStore.newTransaction();
}
- txn.dequeueMessage(queue, message);
+ txn.dequeueMessage(enqueueRecord);
}
}
@@ -142,11 +141,12 @@ public class AutoCommitTransaction implements ServerTransaction
}
- public void enqueue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction)
+ public void enqueue(TransactionLogResource queue, EnqueueableMessage message, EnqueueAction postTransactionAction)
{
Transaction txn = null;
try
{
+ final MessageEnqueueRecord record;
if(queue.getMessageDurability().persist(message.isPersistent()))
{
if (_logger.isDebugEnabled())
@@ -155,27 +155,52 @@ public class AutoCommitTransaction implements ServerTransaction
}
txn = _messageStore.newTransaction();
- txn.enqueueMessage(queue, message);
+ record = txn.enqueueMessage(queue, message);
txn.commitTran();
txn = null;
}
- postTransactionAction.postCommit();
+ else
+ {
+ record = null;
+ }
+ if(postTransactionAction != null)
+ {
+ postTransactionAction.postCommit(record);
+ }
postTransactionAction = null;
}
finally
{
- rollbackIfNecessary(postTransactionAction, txn);
+ final EnqueueAction underlying = postTransactionAction;
+ rollbackIfNecessary(new Action()
+ {
+ @Override
+ public void postCommit()
+ {
+
+ }
+
+ @Override
+ public void onRollback()
+ {
+ if(underlying != null)
+ {
+ underlying.onRollback();
+ }
+ }
+ }, txn);
}
}
- public void enqueue(List<? extends BaseQueue> queues, EnqueueableMessage message, Action postTransactionAction)
+ public void enqueue(List<? extends BaseQueue> queues, EnqueueableMessage message, EnqueueAction postTransactionAction)
{
Transaction txn = null;
try
{
-
+ MessageEnqueueRecord[] enqueueRecords = new MessageEnqueueRecord[queues.size()];
+ int i = 0;
for(BaseQueue queue : queues)
{
if (queue.getMessageDurability().persist(message.isPersistent()))
@@ -188,11 +213,11 @@ public class AutoCommitTransaction implements ServerTransaction
{
txn = _messageStore.newTransaction();
}
- txn.enqueueMessage(queue, message);
+ enqueueRecords[i] = txn.enqueueMessage(queue, message);
}
-
+ i++;
}
if (txn != null)
{
@@ -200,13 +225,34 @@ public class AutoCommitTransaction implements ServerTransaction
txn = null;
}
- postTransactionAction.postCommit();
+ if(postTransactionAction != null)
+ {
+ postTransactionAction.postCommit(enqueueRecords);
+ }
postTransactionAction = null;
- }finally
+ }
+ finally
{
- rollbackIfNecessary(postTransactionAction, txn);
+ final EnqueueAction underlying = postTransactionAction;
+ rollbackIfNecessary(new Action()
+ {
+ @Override
+ public void postCommit()
+ {
+
+ }
+
+ @Override
+ public void onRollback()
+ {
+ if(underlying != null)
+ {
+ underlying.onRollback();
+ }
+ }
+ }, txn);
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java
index cb713f671f..28b1cc2117 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java
@@ -25,6 +25,7 @@ import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@@ -74,26 +75,27 @@ public class DistributedTransaction implements ServerTransaction
}
}
- public void dequeue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction)
+ public void dequeue(MessageEnqueueRecord record, Action postTransactionAction)
{
if(_branch != null)
{
- _branch.dequeue(queue, message);
+ _branch.dequeue(record);
_branch.addPostTransactionAction(postTransactionAction);
}
else
{
- _autoCommitTransaction.dequeue(queue, message, postTransactionAction);
+ _autoCommitTransaction.dequeue(record, postTransactionAction);
}
}
+
public void dequeue(Collection<MessageInstance> messages, Action postTransactionAction)
{
if(_branch != null)
{
for(MessageInstance entry : messages)
{
- _branch.dequeue(entry.getOwningResource(), entry.getMessage());
+ _branch.dequeue(entry.getEnqueueRecord());
}
_branch.addPostTransactionAction(postTransactionAction);
}
@@ -103,12 +105,33 @@ public class DistributedTransaction implements ServerTransaction
}
}
- public void enqueue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction)
+ public void enqueue(TransactionLogResource queue, EnqueueableMessage message, final EnqueueAction postTransactionAction)
{
if(_branch != null)
{
- _branch.enqueue(queue, message);
- _branch.addPostTransactionAction(postTransactionAction);
+ final MessageEnqueueRecord[] enqueueRecords = new MessageEnqueueRecord[1];
+ _branch.enqueue(queue, message, new org.apache.qpid.server.util.Action<MessageEnqueueRecord>()
+ {
+ @Override
+ public void performAction(final MessageEnqueueRecord record)
+ {
+ enqueueRecords[0] = record;
+ }
+ });
+ addPostTransactionAction(new Action()
+ {
+ @Override
+ public void postCommit()
+ {
+ postTransactionAction.postCommit(enqueueRecords);
+ }
+
+ @Override
+ public void onRollback()
+ {
+ postTransactionAction.onRollback();
+ }
+ });
}
else
{
@@ -117,15 +140,39 @@ public class DistributedTransaction implements ServerTransaction
}
public void enqueue(List<? extends BaseQueue> queues, EnqueueableMessage message,
- Action postTransactionAction)
+ final EnqueueAction postTransactionAction)
{
if(_branch != null)
{
+ final MessageEnqueueRecord[] enqueueRecords = new MessageEnqueueRecord[queues.size()];
+ int i = 0;
for(BaseQueue queue : queues)
{
- _branch.enqueue(queue, message);
+ final int pos = i;
+ _branch.enqueue(queue, message, new org.apache.qpid.server.util.Action<MessageEnqueueRecord>()
+ {
+ @Override
+ public void performAction(final MessageEnqueueRecord record)
+ {
+ enqueueRecords[pos] = record;
+ }
+ });
+ i++;
}
- _branch.addPostTransactionAction(postTransactionAction);
+ addPostTransactionAction(new Action()
+ {
+ @Override
+ public void postCommit()
+ {
+ postTransactionAction.postCommit(enqueueRecords);
+ }
+
+ @Override
+ public void onRollback()
+ {
+ postTransactionAction.onRollback();
+ }
+ });
}
else
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java
index 15b66a90a5..a3521f410c 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.ScheduledFuture;
import org.slf4j.Logger;
@@ -32,10 +33,14 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.store.MessageDurability;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreException;
+import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.Transaction;
import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.transport.Xid;
@@ -48,14 +53,15 @@ public class DtxBranch
private State _state = State.ACTIVE;
private long _timeout;
private Map<AMQSessionModel, State> _associatedSessions = new HashMap<AMQSessionModel, State>();
- private final List<Record> _enqueueRecords = new ArrayList<Record>();
- private final List<Record> _dequeueRecords = new ArrayList<Record>();
+ private final List<EnqueueRecord> _enqueueRecords = new ArrayList<>();
+ private final List<DequeueRecord> _dequeueRecords = new ArrayList<>();
private Transaction _transaction;
private long _expiration;
private VirtualHostImpl _vhost;
private ScheduledFuture<?> _timeoutFuture;
private MessageStore _store;
+ private Transaction.StoredXidRecord _storedXidRecord;
public enum State
@@ -77,6 +83,12 @@ public class DtxBranch
_vhost = vhost;
}
+ public DtxBranch(Transaction.StoredXidRecord storedXidRecord, MessageStore store, VirtualHostImpl vhost)
+ {
+ this(new Xid(storedXidRecord.getFormat(), storedXidRecord.getGlobalId(), storedXidRecord.getBranchId()), store, vhost);
+ _storedXidRecord = storedXidRecord;
+ }
+
public Xid getXid()
{
return _xid;
@@ -227,11 +239,11 @@ public class DtxBranch
}
Transaction txn = _store.newTransaction();
- txn.recordXid(_xid.getFormat(),
+ _storedXidRecord = txn.recordXid(_xid.getFormat(),
_xid.getGlobalId(),
_xid.getBranchId(),
- _enqueueRecords.toArray(new Record[_enqueueRecords.size()]),
- _dequeueRecords.toArray(new Record[_dequeueRecords.size()]));
+ _enqueueRecords.toArray(new EnqueueRecord[_enqueueRecords.size()]),
+ _dequeueRecords.toArray(new DequeueRecord[_dequeueRecords.size()]));
txn.commitTran();
prePrepareTransaction();
@@ -266,7 +278,7 @@ public class DtxBranch
// prepare has previously been called
Transaction txn = _store.newTransaction();
- txn.removeXid(_xid.getFormat(), _xid.getGlobalId(), _xid.getBranchId());
+ txn.removeXid(_storedXidRecord);
txn.commitTran();
_transaction.abortTran();
@@ -309,7 +321,7 @@ public class DtxBranch
}
else
{
- _transaction.removeXid(_xid.getFormat(), _xid.getGlobalId(), _xid.getBranchId());
+ _transaction.removeXid(_storedXidRecord);
}
_transaction.commitTran();
@@ -324,21 +336,25 @@ public class DtxBranch
{
_transaction = _store.newTransaction();
- for(Record enqueue : _enqueueRecords)
+ for(final EnqueueRecord enqueue : _enqueueRecords)
{
+ final MessageEnqueueRecord record;
if(enqueue.isDurable())
{
- _transaction.enqueueMessage(enqueue.getResource(), enqueue.getMessage());
+ record = _transaction.enqueueMessage(enqueue.getResource(), enqueue.getMessage());
+
}
+ else
+ {
+ record = null;
+ }
+ enqueue.getEnqueueAction().performAction(record);
}
- for(Record enqueue : _dequeueRecords)
+ for(DequeueRecord dequeue : _dequeueRecords)
{
- if(enqueue.isDurable())
- {
- _transaction.dequeueMessage(enqueue.getResource(), enqueue.getMessage());
- }
+ _transaction.dequeueMessage(dequeue.getEnqueueRecord());
}
}
@@ -349,28 +365,58 @@ public class DtxBranch
}
- public void dequeue(TransactionLogResource resource, EnqueueableMessage message)
+ public void dequeue(MessageEnqueueRecord record)
{
- _dequeueRecords.add(new Record(resource, message));
+ if(record != null)
+ {
+ _dequeueRecords.add(new DequeueRecord(record));
+ }
}
+ public void enqueue(TransactionLogResource queue,
+ EnqueueableMessage message,
+ final Action<MessageEnqueueRecord> enqueueAction)
+ {
+ _enqueueRecords.add(new EnqueueRecord(queue, message, enqueueAction));
+ }
- public void enqueue(TransactionLogResource queue, EnqueueableMessage message)
+ private static class DequeueRecord implements Transaction.DequeueRecord
{
- _enqueueRecords.add(new Record(queue, message));
+ private final MessageEnqueueRecord _enqueueRecord;
+
+ public DequeueRecord(MessageEnqueueRecord enqueueRecord)
+ {
+ _enqueueRecord = enqueueRecord;
+ }
+
+ public MessageEnqueueRecord getEnqueueRecord()
+ {
+ return _enqueueRecord;
+ }
+
+
}
- private static final class Record implements Transaction.Record
+ private static class EnqueueRecord implements Transaction.EnqueueRecord
{
private final TransactionLogResource _resource;
private final EnqueueableMessage _message;
- public Record(TransactionLogResource resource, EnqueueableMessage message)
+ private final Action<MessageEnqueueRecord> _enqueueAction;
+
+ public EnqueueRecord(final TransactionLogResource resource,
+ final EnqueueableMessage message,
+ final Action<MessageEnqueueRecord> enqueueAction)
{
_resource = resource;
_message = message;
+ _enqueueAction = enqueueAction;
}
+ public Action<MessageEnqueueRecord> getEnqueueAction()
+ {
+ return _enqueueAction;
+ }
public TransactionLogResource getResource()
{
return _resource;
@@ -385,6 +431,7 @@ public class DtxBranch
{
return _resource.getMessageDurability().persist(_message.isPersistent());
}
+
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
index b800556312..541b60e576 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
@@ -30,8 +30,8 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.message.MessageInstance;
-import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.util.FutureResult;
import org.apache.qpid.server.store.Transaction;
@@ -92,23 +92,23 @@ public class LocalTransaction implements ServerTransaction
_postTransactionActions.add(postTransactionAction);
}
- public void dequeue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction)
+ public void dequeue(MessageEnqueueRecord record, Action postTransactionAction)
{
sync();
_postTransactionActions.add(postTransactionAction);
initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime();
- if(queue.getMessageDurability().persist(message.isPersistent()))
+ if(record != null)
{
try
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Dequeue of message number " + message.getMessageNumber() + " from transaction log. Queue : " + queue.getName());
+ _logger.debug("Dequeue of message number " + record.getMessageNumber() + " from transaction log. Queue : " + record.getQueueId());
}
beginTranIfNecessary();
- _transaction.dequeueMessage(queue, message);
+ _transaction.dequeueMessage(record);
}
catch(RuntimeException e)
{
@@ -127,18 +127,16 @@ public class LocalTransaction implements ServerTransaction
{
for(MessageInstance entry : queueEntries)
{
- ServerMessage message = entry.getMessage();
- TransactionLogResource queue = entry.getOwningResource();
-
- if(queue.getMessageDurability().persist(message.isPersistent()))
+ final MessageEnqueueRecord record = entry.getEnqueueRecord();
+ if(record != null)
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Dequeue of message number " + message.getMessageNumber() + " from transaction log. Queue : " + queue.getName());
+ _logger.debug("Dequeue of message number " + record.getMessageNumber() + " from transaction log. Queue : " + record.getQueueId());
}
beginTranIfNecessary();
- _transaction.dequeueMessage(queue, message);
+ _transaction.dequeueMessage(record);
}
}
@@ -181,10 +179,9 @@ public class LocalTransaction implements ServerTransaction
}
}
- public void enqueue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction)
+ public void enqueue(TransactionLogResource queue, EnqueueableMessage message, EnqueueAction postTransactionAction)
{
sync();
- _postTransactionActions.add(postTransactionAction);
initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime();
if(queue.getMessageDurability().persist(message.isPersistent()))
@@ -197,23 +194,83 @@ public class LocalTransaction implements ServerTransaction
}
beginTranIfNecessary();
- _transaction.enqueueMessage(queue, message);
+ final MessageEnqueueRecord record = _transaction.enqueueMessage(queue, message);
+ if(postTransactionAction != null)
+ {
+ final EnqueueAction underlying = postTransactionAction;
+
+ _postTransactionActions.add(new Action()
+ {
+ @Override
+ public void postCommit()
+ {
+ underlying.postCommit(record);
+ }
+
+ @Override
+ public void onRollback()
+ {
+ underlying.onRollback();
+ }
+ });
+ }
}
catch(RuntimeException e)
{
+ if(postTransactionAction != null)
+ {
+ final EnqueueAction underlying = postTransactionAction;
+
+ _postTransactionActions.add(new Action()
+ {
+ @Override
+ public void postCommit()
+ {
+
+ }
+
+ @Override
+ public void onRollback()
+ {
+ underlying.onRollback();
+ }
+ });
+ }
tidyUpOnError(e);
}
}
+ else
+ {
+ if(postTransactionAction != null)
+ {
+ final EnqueueAction underlying = postTransactionAction;
+ _postTransactionActions.add(new Action()
+ {
+ @Override
+ public void postCommit()
+ {
+ underlying.postCommit((MessageEnqueueRecord)null);
+ }
+
+ @Override
+ public void onRollback()
+ {
+ underlying.onRollback();
+ }
+ });
+ }
+ }
}
- public void enqueue(List<? extends BaseQueue> queues, EnqueueableMessage message, Action postTransactionAction)
+ public void enqueue(List<? extends BaseQueue> queues, EnqueueableMessage message, EnqueueAction postTransactionAction)
{
sync();
- _postTransactionActions.add(postTransactionAction);
initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime();
try
{
+ final MessageEnqueueRecord[] records = new MessageEnqueueRecord[queues.size()];
+ int i = 0;
for(BaseQueue queue : queues)
{
if(queue.getMessageDurability().persist(message.isPersistent()))
@@ -224,13 +281,53 @@ public class LocalTransaction implements ServerTransaction
}
beginTranIfNecessary();
- _transaction.enqueueMessage(queue, message);
+ records[i] = _transaction.enqueueMessage(queue, message);
}
+ i++;
+ }
+ if(postTransactionAction != null)
+ {
+ final EnqueueAction underlying = postTransactionAction;
+
+ _postTransactionActions.add(new Action()
+ {
+ @Override
+ public void postCommit()
+ {
+ underlying.postCommit(records);
+ }
+
+ @Override
+ public void onRollback()
+ {
+ underlying.onRollback();
+ }
+ });
+ postTransactionAction = null;
}
}
catch(RuntimeException e)
{
+ if(postTransactionAction != null)
+ {
+ final EnqueueAction underlying = postTransactionAction;
+
+ _postTransactionActions.add(new Action()
+ {
+ @Override
+ public void postCommit()
+ {
+
+ }
+
+ @Override
+ public void onRollback()
+ {
+ underlying.onRollback();
+ }
+ });
+ }
tidyUpOnError(e);
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
index cae5fa73bf..5251eea9dd 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
@@ -26,6 +26,7 @@ import java.util.List;
import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
import org.apache.qpid.server.store.TransactionLogResource;
@@ -55,6 +56,15 @@ public interface ServerTransaction
public void onRollback();
}
+ public static interface EnqueueAction
+ {
+ public void postCommit(MessageEnqueueRecord... records);
+
+ public void onRollback();
+ }
+
+
+
/**
* Return the time the current transaction started.
*
@@ -75,12 +85,12 @@ public interface ServerTransaction
*/
void addPostTransactionAction(Action postTransactionAction);
- /**
+ /**
* Dequeue a message from a queue registering a post transaction action.
- *
- * A store operation will result only for a persistent message on a durable queue.
+ *
+ * A store operation will result only for a if the record is not null.
*/
- void dequeue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction);
+ void dequeue(MessageEnqueueRecord record, Action postTransactionAction);
/**
* Dequeue a message(s) from queue(s) registering a post transaction action.
@@ -94,14 +104,14 @@ public interface ServerTransaction
*
* A store operation will result only for a persistent message on a durable queue.
*/
- void enqueue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction);
+ void enqueue(TransactionLogResource queue, EnqueueableMessage message, EnqueueAction postTransactionAction);
/**
* Enqueue a message(s) to queue(s) registering a post transaction action.
*
* Store operations will result only for a persistent messages on durable queues.
*/
- void enqueue(List<? extends BaseQueue> queues, EnqueueableMessage message, Action postTransactionAction);
+ void enqueue(List<? extends BaseQueue> queues, EnqueueableMessage message, EnqueueAction postTransactionAction);
/**
* Commit the transaction represented by this object.
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index e0b0d37cbe..df941d16bf 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -86,6 +86,7 @@ import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.Event;
import org.apache.qpid.server.store.EventListener;
import org.apache.qpid.server.store.GenericRecoverer;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.MessageStoreProvider;
import org.apache.qpid.server.store.StoreException;
@@ -1182,7 +1183,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
}
if(acquired)
{
- txn.dequeue(messageInstance.getOwningResource(), messageInstance.getMessage(), new ServerTransaction.Action()
+ txn.dequeue(messageInstance.getEnqueueRecord(), new ServerTransaction.Action()
{
public void postCommit()
{
@@ -1201,11 +1202,11 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
final ServerMessage message = entry.getMessage();
final AMQQueue toQueue = (AMQQueue)queue;
- txn.enqueue(toQueue, message, new ServerTransaction.Action()
+ txn.enqueue(toQueue, message, new ServerTransaction.EnqueueAction()
{
- public void postCommit()
+ public void postCommit(MessageEnqueueRecord... records)
{
- toQueue.enqueue(message, null);
+ toQueue.enqueue(message, null, records[0]);
}
public void onRollback()
@@ -1222,12 +1223,12 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
if(entry.acquire())
{
txn.enqueue(toQueue, message,
- new ServerTransaction.Action()
+ new ServerTransaction.EnqueueAction()
{
- public void postCommit()
+ public void postCommit(MessageEnqueueRecord... records)
{
- toQueue.enqueue(message, null);
+ toQueue.enqueue(message, null, records[0]);
}
public void onRollback()
@@ -1235,20 +1236,23 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
entry.release();
}
});
- txn.dequeue(entry.getOwningResource(), message,
- new ServerTransaction.Action()
- {
-
- public void postCommit()
+ if(entry instanceof QueueEntry)
+ {
+ txn.dequeue(entry.getEnqueueRecord(),
+ new ServerTransaction.Action()
{
- entry.delete();
- }
- public void onRollback()
- {
+ public void postCommit()
+ {
+ entry.delete();
+ }
- }
- });
+ public void onRollback()
+ {
+
+ }
+ });
+ }
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
index a24ddec7df..696a4c3da9 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
@@ -25,7 +25,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.UUID;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -38,12 +37,12 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.messages.TransactionLogMessages;
import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
-import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.plugin.MessageMetaDataType;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.StoredMessage;
@@ -54,6 +53,7 @@ import org.apache.qpid.server.store.handler.MessageInstanceHandler;
import org.apache.qpid.server.txn.DtxBranch;
import org.apache.qpid.server.txn.DtxRegistry;
import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.transport.Xid;
import org.apache.qpid.transport.util.Functions;
@@ -91,6 +91,7 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer
private final AtomicBoolean _recoveryComplete = new AtomicBoolean();
private final Map<Long, MessageReference<? extends ServerMessage<?>>> _recoveredMessages = new HashMap<>();
private final ExecutorService _queueRecoveryExecutor = Executors.newCachedThreadPool();
+ private final MessageStore.MessageStoreReader _storeReader;
private AtomicBoolean _continueRecovery = new AtomicBoolean(true);
private AsynchronousRecoverer(final VirtualHostImpl<?, ?, ?> virtualHost)
@@ -98,6 +99,7 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer
_virtualHost = virtualHost;
_eventLogger = virtualHost.getEventLogger();
_store = virtualHost.getMessageStore();
+ _storeReader = _store.newMessageStoreReader();
_logSubject = new MessageStoreLogSubject(virtualHost.getName(), _store.getClass().getSimpleName());
_maxMessageId = _store.getNextMessageId();
@@ -107,7 +109,7 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer
public void recover()
{
- getStore().visitDistributedTransactions(new DistributedTransactionVisitor());
+ getStoreReader().visitDistributedTransactions(new DistributedTransactionVisitor());
for(AMQQueue<?> queue : _recoveringQueues)
{
@@ -125,9 +127,9 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer
return _eventLogger;
}
- public MessageStore getStore()
+ public MessageStore.MessageStoreReader getStoreReader()
{
- return _store;
+ return _storeReader;
}
public MessageStoreLogSubject getLogSubject()
@@ -143,7 +145,7 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer
private void recoverQueue(AMQQueue<?> queue)
{
MessageInstanceVisitor handler = new MessageInstanceVisitor(queue);
- _store.visitMessageInstances(queue, handler);
+ _storeReader.visitMessageInstances(queue, handler);
getEventLogger().message(getLogSubject(), TransactionLogMessages.RECOVERED(handler.getRecoveredCount(), queue.getName()));
getEventLogger().message(getLogSubject(), TransactionLogMessages.RECOVERY_COMPLETE(queue.getName(), true));
@@ -165,18 +167,18 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer
entry.setValue(null); // free up any memory associated with the reference object
}
final List<StoredMessage<?>> messagesToDelete = new ArrayList<>();
- getStore().visitMessages(new MessageHandler()
+ getStoreReader().visitMessages(new MessageHandler()
{
@Override
public boolean handle(final StoredMessage<?> storedMessage)
{
long messageNumber = storedMessage.getMessageNumber();
- if(!_recoveredMessages.containsKey(messageNumber))
+ if (!_recoveredMessages.containsKey(messageNumber))
{
messagesToDelete.add(storedMessage);
}
- return _continueRecovery.get() && messageNumber <_maxMessageId-1;
+ return _continueRecovery.get() && messageNumber < _maxMessageId - 1;
}
});
for(StoredMessage<?> storedMessage : messagesToDelete)
@@ -192,6 +194,7 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer
messagesToDelete.clear();
_recoveredMessages.clear();
+ _storeReader.close();
}
private synchronized ServerMessage<?> getRecoveredMessage(final long messageId)
@@ -199,7 +202,7 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer
MessageReference<? extends ServerMessage<?>> ref = _recoveredMessages.get(messageId);
if (ref == null)
{
- StoredMessage<?> message = _store.getMessage(messageId);
+ StoredMessage<?> message = _storeReader.getMessage(messageId);
if(message != null)
{
StorableMessageMetaData metaData = message.getMetaData();
@@ -234,6 +237,7 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer
{
Thread.currentThread().interrupt();
}
+ _storeReader.close();
}
@@ -241,23 +245,20 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer
{
-
@Override
- public boolean handle(long format,
- byte[] globalId,
- byte[] branchId,
- Transaction.Record[] enqueues,
- Transaction.Record[] dequeues)
+ public boolean handle(final Transaction.StoredXidRecord storedXid,
+ final Transaction.EnqueueRecord[] enqueues,
+ final Transaction.DequeueRecord[] dequeues)
{
- Xid id = new Xid(format, globalId, branchId);
+ Xid id = new Xid(storedXid.getFormat(), storedXid.getGlobalId(), storedXid.getBranchId());
DtxRegistry dtxRegistry = getVirtualHost().getDtxRegistry();
DtxBranch branch = dtxRegistry.getBranch(id);
if (branch == null)
{
- branch = new DtxBranch(id, getStore(), getVirtualHost());
+ branch = new DtxBranch(storedXid, _store, getVirtualHost());
dtxRegistry.registerBranch(branch);
}
- for (Transaction.Record record : enqueues)
+ for (Transaction.EnqueueRecord record : enqueues)
{
final AMQQueue<?> queue = getVirtualHost().getQueue(record.getResource().getId());
if (queue != null)
@@ -269,22 +270,32 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer
{
final MessageReference<?> ref = message.newReference();
- branch.enqueue(queue, message);
+ final MessageEnqueueRecord[] records = new MessageEnqueueRecord[1];
+ branch.enqueue(queue, message, new Action<MessageEnqueueRecord>()
+ {
+ @Override
+ public void performAction(final MessageEnqueueRecord record)
+ {
+ records[0] = record;
+ }
+ });
branch.addPostTransactionAction(new ServerTransaction.Action()
{
-
+ @Override
public void postCommit()
{
- queue.enqueue(message, null);
+ queue.enqueue(message, null, records[0]);
ref.release();
}
+ @Override
public void onRollback()
{
ref.release();
}
});
+
}
else
{
@@ -306,10 +317,10 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer
}
}
- for (Transaction.Record record : dequeues)
+ for (Transaction.DequeueRecord record : dequeues)
{
- final AMQQueue<?> queue = getVirtualHost().getQueue(record.getResource().getId());
+ final AMQQueue<?> queue = getVirtualHost().getQueue(record.getEnqueueRecord().getQueueId());
if (queue != null)
{
@@ -321,7 +332,7 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer
recoverQueue(queue);
}
- final long messageId = record.getMessage().getMessageNumber();
+ final long messageId = record.getEnqueueRecord().getMessageNumber();
final ServerMessage<?> message = getRecoveredMessage(messageId);
if (message != null)
@@ -330,7 +341,7 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer
entry.acquire();
- branch.dequeue(queue, message);
+ branch.dequeue(entry.getEnqueueRecord());
branch.addPostTransactionAction(new ServerTransaction.Action()
{
@@ -362,8 +373,8 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer
StringBuilder xidString = xidAsString(id);
getEventLogger().message(getLogSubject(),
TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(),
- record.getResource()
- .getId()
+ record.getEnqueueRecord()
+ .getQueueId()
.toString()));
}
@@ -427,8 +438,9 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer
}
@Override
- public boolean handle(final UUID queueId, long messageId)
+ public boolean handle(final MessageEnqueueRecord record)
{
+ long messageId = record.getMessageNumber();
String queueName = _queue.getName();
if(messageId < _maxMessageId)
@@ -442,7 +454,7 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer
_logger.debug("On recovery, delivering " + message.getMessageNumber() + " to " + queueName);
}
- _queue.recover(message);
+ _queue.recover(message, record);
_recoveredCount++;
}
else
@@ -452,8 +464,8 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer
+ " referenced in log as enqueued in queue "
+ queueName
+ " is unknown, entry will be discarded");
- Transaction txn = getStore().newTransaction();
- txn.dequeueMessage(_queue, new DummyMessage(messageId));
+ Transaction txn = _store.newTransaction();
+ txn.dequeueMessage(record);
txn.commitTranAsync();
}
return _continueRecovery.get();
@@ -472,31 +484,5 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer
}
}
- private static class DummyMessage implements EnqueueableMessage
- {
-
- private final long _messageId;
-
- public DummyMessage(long messageId)
- {
- _messageId = messageId;
- }
-
- public long getMessageNumber()
- {
- return _messageId;
- }
-
- public boolean isPersistent()
- {
- return true;
- }
-
- public StoredMessage getStoredMessage()
- {
- return null;
- }
- }
-
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java
index 8bdffeb3c6..64264955fb 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java
@@ -33,25 +33,24 @@ import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.messages.MessageStoreMessages;
import org.apache.qpid.server.logging.messages.TransactionLogMessages;
import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
-import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.plugin.MessageMetaDataType;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.store.MessageDurability;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.Transaction;
-import org.apache.qpid.server.store.Transaction.Record;
-import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.store.Transaction.EnqueueRecord;
import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
import org.apache.qpid.server.store.handler.MessageHandler;
import org.apache.qpid.server.store.handler.MessageInstanceHandler;
import org.apache.qpid.server.txn.DtxBranch;
import org.apache.qpid.server.txn.DtxRegistry;
import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.transport.Xid;
import org.apache.qpid.transport.util.Functions;
@@ -64,6 +63,7 @@ public class SynchronousMessageStoreRecoverer implements MessageStoreRecoverer
{
EventLogger eventLogger = virtualHost.getEventLogger();
MessageStore store = virtualHost.getMessageStore();
+ MessageStore.MessageStoreReader storeReader = store.newMessageStoreReader();
MessageStoreLogSubject logSubject = new MessageStoreLogSubject(virtualHost.getName(), store.getClass().getSimpleName());
Map<String, Integer> queueRecoveries = new TreeMap<>();
@@ -73,10 +73,10 @@ public class SynchronousMessageStoreRecoverer implements MessageStoreRecoverer
eventLogger.message(logSubject, MessageStoreMessages.RECOVERY_START());
- store.visitMessages(new MessageVisitor(recoveredMessages, unusedMessages));
+ storeReader.visitMessages(new MessageVisitor(recoveredMessages, unusedMessages));
eventLogger.message(logSubject, TransactionLogMessages.RECOVERY_START(null, false));
- store.visitMessageInstances(new MessageInstanceVisitor(virtualHost, store, queueRecoveries,
+ storeReader.visitMessageInstances(new MessageInstanceVisitor(virtualHost, store, queueRecoveries,
recoveredMessages, unusedMessages));
for(Map.Entry<String,Integer> entry : queueRecoveries.entrySet())
{
@@ -95,7 +95,7 @@ public class SynchronousMessageStoreRecoverer implements MessageStoreRecoverer
}
}
- store.visitDistributedTransactions(new DistributedTransactionVisitor(virtualHost, store, eventLogger,
+ storeReader.visitDistributedTransactions(new DistributedTransactionVisitor(virtualHost, store, eventLogger,
logSubject, recoveredMessages, unusedMessages));
@@ -174,8 +174,10 @@ public class SynchronousMessageStoreRecoverer implements MessageStoreRecoverer
}
@Override
- public boolean handle(final UUID queueId, long messageId)
+ public boolean handle(final MessageEnqueueRecord record)
{
+ final UUID queueId = record.getQueueId();
+ long messageId = record.getMessageNumber();
AMQQueue<?> queue = _virtualHost.getQueue(queueId);
if(queue != null)
{
@@ -196,7 +198,7 @@ public class SynchronousMessageStoreRecoverer implements MessageStoreRecoverer
count = 0;
}
- queue.recover(message);
+ queue.recover(message, record);
_queueRecoveries.put(queueName, ++count);
}
@@ -204,7 +206,7 @@ public class SynchronousMessageStoreRecoverer implements MessageStoreRecoverer
{
_logger.warn("Message id " + messageId + " referenced in log as enqueued in queue " + queueName + " is unknown, entry will be discarded");
Transaction txn = _store.newTransaction();
- txn.dequeueMessage(queue, new DummyMessage(messageId));
+ txn.dequeueMessage(record);
txn.commitTranAsync();
}
}
@@ -212,28 +214,7 @@ public class SynchronousMessageStoreRecoverer implements MessageStoreRecoverer
{
_logger.warn("Message id " + messageId + " in log references queue with id " + queueId + " which is not in the configuration, entry will be discarded");
Transaction txn = _store.newTransaction();
- TransactionLogResource mockQueue =
- new TransactionLogResource()
- {
- @Override
- public String getName()
- {
- return "<<UNKNOWN>>";
- }
-
- @Override
- public UUID getId()
- {
- return queueId;
- }
-
- @Override
- public MessageDurability getMessageDurability()
- {
- return MessageDurability.DEFAULT;
- }
- };
- txn.dequeueMessage(mockQueue, new DummyMessage(messageId));
+ txn.dequeueMessage(record);
txn.commitTranAsync();
}
return true;
@@ -267,17 +248,19 @@ public class SynchronousMessageStoreRecoverer implements MessageStoreRecoverer
}
@Override
- public boolean handle(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues)
+ public boolean handle(final Transaction.StoredXidRecord storedXid,
+ final Transaction.EnqueueRecord[] enqueues,
+ final Transaction.DequeueRecord[] dequeues)
{
- Xid id = new Xid(format, globalId, branchId);
+ Xid id = new Xid(storedXid.getFormat(), storedXid.getGlobalId(), storedXid.getBranchId());
DtxRegistry dtxRegistry = _virtualHost.getDtxRegistry();
DtxBranch branch = dtxRegistry.getBranch(id);
if(branch == null)
{
- branch = new DtxBranch(id, _store, _virtualHost);
+ branch = new DtxBranch(storedXid, _store, _virtualHost);
dtxRegistry.registerBranch(branch);
}
- for(Transaction.Record record : enqueues)
+ for(EnqueueRecord record : enqueues)
{
final AMQQueue<?> queue = _virtualHost.getQueue(record.getResource().getId());
if(queue != null)
@@ -289,23 +272,32 @@ public class SynchronousMessageStoreRecoverer implements MessageStoreRecoverer
if(message != null)
{
final MessageReference<?> ref = message.newReference();
+ final MessageEnqueueRecord[] records = new MessageEnqueueRecord[1];
- branch.enqueue(queue,message);
-
+ branch.enqueue(queue, message, new Action<MessageEnqueueRecord>()
+ {
+ @Override
+ public void performAction(final MessageEnqueueRecord record)
+ {
+ records[0] = record;
+ }
+ });
branch.addPostTransactionAction(new ServerTransaction.Action()
{
-
+ @Override
public void postCommit()
{
- queue.enqueue(message, null);
+ queue.enqueue(message, null, records[0]);
ref.release();
}
+ @Override
public void onRollback()
{
ref.release();
}
});
+
}
else
{
@@ -324,12 +316,12 @@ public class SynchronousMessageStoreRecoverer implements MessageStoreRecoverer
}
}
- for(Transaction.Record record : dequeues)
+ for(Transaction.DequeueRecord record : dequeues)
{
- final AMQQueue<?> queue = _virtualHost.getQueue(record.getResource().getId());
+ final AMQQueue<?> queue = _virtualHost.getQueue(record.getEnqueueRecord().getQueueId());
if(queue != null)
{
- final long messageId = record.getMessage().getMessageNumber();
+ final long messageId = record.getEnqueueRecord().getMessageNumber();
final ServerMessage<?> message = _recoveredMessages.get(messageId);
_unusedMessages.remove(messageId);
@@ -339,7 +331,7 @@ public class SynchronousMessageStoreRecoverer implements MessageStoreRecoverer
entry.acquire();
- branch.dequeue(queue, message);
+ branch.dequeue(entry.getEnqueueRecord());
branch.addPostTransactionAction(new ServerTransaction.Action()
{
@@ -370,7 +362,7 @@ public class SynchronousMessageStoreRecoverer implements MessageStoreRecoverer
StringBuilder xidString = xidAsString(id);
_eventLogger.message(_logSubject,
TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(),
- record.getResource().getId().toString()));
+ record.getEnqueueRecord().getQueueId().toString()));
}
}
@@ -395,30 +387,4 @@ public class SynchronousMessageStoreRecoverer implements MessageStoreRecoverer
}
- private static class DummyMessage implements EnqueueableMessage
- {
-
- private final long _messageId;
-
- public DummyMessage(long messageId)
- {
- _messageId = messageId;
- }
-
- public long getMessageNumber()
- {
- return _messageId;
- }
-
- public boolean isPersistent()
- {
- return true;
- }
-
- public StoredMessage getStoredMessage()
- {
- return null;
- }
- }
-
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
index c775a70cb8..b0cced47f6 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
@@ -503,7 +503,7 @@ public class TopicExchangeTest extends QpidTestCase
when(message.getMessageNumber()).thenReturn(messageNumber);
for(BaseQueue q : queues)
{
- q.enqueue(message, null);
+ q.enqueue(message, null, null);
}
return queues.size();
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
index 749b58323b..f3fc11257f 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
@@ -179,7 +179,7 @@ abstract class AbstractQueueTestBase extends QpidTestCase
_queue.getConsumerCountWithCredit());
// Check sending a message ends up with the subscriber
- _queue.enqueue(messageA, null);
+ _queue.enqueue(messageA, null, null);
try
{
Thread.sleep(2000L);
@@ -198,7 +198,7 @@ abstract class AbstractQueueTestBase extends QpidTestCase
1 == _queue.getConsumerCountWithCredit());
ServerMessage messageB = createMessage(new Long (25));
- _queue.enqueue(messageB, null);
+ _queue.enqueue(messageB, null, null);
assertNull(_consumer.getQueueContext());
}
@@ -206,7 +206,7 @@ abstract class AbstractQueueTestBase extends QpidTestCase
public void testEnqueueMessageThenRegisterConsumer() throws Exception, InterruptedException
{
ServerMessage messageA = createMessage(new Long(24));
- _queue.enqueue(messageA, null);
+ _queue.enqueue(messageA, null, null);
_consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
EnumSet.of(ConsumerImpl.Option.ACQUIRES,
ConsumerImpl.Option.SEES_REQUEUES));
@@ -223,8 +223,8 @@ abstract class AbstractQueueTestBase extends QpidTestCase
{
ServerMessage messageA = createMessage(new Long(24));
ServerMessage messageB = createMessage(new Long(25));
- _queue.enqueue(messageA, null);
- _queue.enqueue(messageB, null);
+ _queue.enqueue(messageA, null, null);
+ _queue.enqueue(messageB, null, null);
_consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
EnumSet.of(ConsumerImpl.Option.ACQUIRES,
ConsumerImpl.Option.SEES_REQUEUES));
@@ -255,9 +255,9 @@ abstract class AbstractQueueTestBase extends QpidTestCase
/* Enqueue three messages */
- _queue.enqueue(messageA, postEnqueueAction);
- _queue.enqueue(messageB, postEnqueueAction);
- _queue.enqueue(messageC, postEnqueueAction);
+ _queue.enqueue(messageA, postEnqueueAction, null);
+ _queue.enqueue(messageB, postEnqueueAction, null);
+ _queue.enqueue(messageC, postEnqueueAction, null);
Thread.sleep(150); // Work done by QueueRunner Thread
@@ -306,7 +306,7 @@ abstract class AbstractQueueTestBase extends QpidTestCase
final long expiration = System.currentTimeMillis() + messageExpirationOffset;
when(messageA.getExpiration()).thenReturn(expiration);
- _queue.enqueue(messageA, postEnqueueAction);
+ _queue.enqueue(messageA, postEnqueueAction, null);
int subFlushWaitTime = 150;
Thread.sleep(subFlushWaitTime); // Work done by QueueRunner Thread
@@ -354,9 +354,9 @@ abstract class AbstractQueueTestBase extends QpidTestCase
/* Enqueue three messages */
- _queue.enqueue(messageA, postEnqueueAction);
- _queue.enqueue(messageB, postEnqueueAction);
- _queue.enqueue(messageC, postEnqueueAction);
+ _queue.enqueue(messageA, postEnqueueAction, null);
+ _queue.enqueue(messageB, postEnqueueAction, null);
+ _queue.enqueue(messageC, postEnqueueAction, null);
Thread.sleep(150); // Work done by QueueRunner Thread
@@ -412,8 +412,8 @@ abstract class AbstractQueueTestBase extends QpidTestCase
/* Enqueue two messages */
- _queue.enqueue(messageA, postEnqueueAction);
- _queue.enqueue(messageB, postEnqueueAction);
+ _queue.enqueue(messageA, postEnqueueAction, null);
+ _queue.enqueue(messageB, postEnqueueAction, null);
Thread.sleep(150); // Work done by QueueRunner Thread
@@ -450,7 +450,7 @@ abstract class AbstractQueueTestBase extends QpidTestCase
_queue.getConsumerCountWithCredit());
// Check sending a message ends up with the subscriber
- _queue.enqueue(messageA, null);
+ _queue.enqueue(messageA, null, null);
try
{
Thread.sleep(2000L);
@@ -517,7 +517,7 @@ abstract class AbstractQueueTestBase extends QpidTestCase
_consumer.resend(entry);
}
- });
+ }, null);
@@ -530,7 +530,7 @@ abstract class AbstractQueueTestBase extends QpidTestCase
ServerMessage message = createMessage(messageId);
// Put message on queue
- _queue.enqueue(message, null);
+ _queue.enqueue(message, null, null);
// Get message id
Long testmsgid = _queue.getMessagesOnTheQueue(1).get(0);
@@ -546,7 +546,7 @@ abstract class AbstractQueueTestBase extends QpidTestCase
Long messageId = new Long(i);
ServerMessage message = createMessage(messageId);
// Put message on queue
- _queue.enqueue(message, null);
+ _queue.enqueue(message, null, null);
}
// Get message ids
List<Long> msgids = _queue.getMessagesOnTheQueue(5);
@@ -567,7 +567,7 @@ abstract class AbstractQueueTestBase extends QpidTestCase
Long messageId = new Long(i);
ServerMessage message = createMessage(messageId);
// Put message on queue
- _queue.enqueue(message, null);
+ _queue.enqueue(message, null, null);
}
// Get message ids
List<Long> msgids = _queue.getMessagesOnTheQueue(5, 5);
@@ -588,7 +588,7 @@ abstract class AbstractQueueTestBase extends QpidTestCase
Long messageId = new Long(i);
ServerMessage message = createMessage(messageId);
// Put message on queue
- _queue.enqueue(message, null);
+ _queue.enqueue(message, null, null);
}
// Get non-existent 0th QueueEntry & check returned list was empty
@@ -763,10 +763,10 @@ abstract class AbstractQueueTestBase extends QpidTestCase
_queue.setAttributes(Collections.<String, Object>singletonMap(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES,
Integer.valueOf(2)));
- _queue.enqueue(createMessage(new Long(24)), null);
+ _queue.enqueue(createMessage(new Long(24)), null, null);
verifyZeroInteractions(listener);
- _queue.enqueue(createMessage(new Long(25)), null);
+ _queue.enqueue(createMessage(new Long(25)), null, null);
verify(listener, atLeastOnce()).notifyClients(eq(NotificationCheck.MESSAGE_COUNT_ALERT), eq(_queue), contains("Maximum count on queue threshold"));
}
@@ -775,9 +775,9 @@ abstract class AbstractQueueTestBase extends QpidTestCase
{
QueueNotificationListener listener = mock(QueueNotificationListener .class);
- _queue.enqueue(createMessage(new Long(24)), null);
- _queue.enqueue(createMessage(new Long(25)), null);
- _queue.enqueue(createMessage(new Long(26)), null);
+ _queue.enqueue(createMessage(new Long(24)), null, null);
+ _queue.enqueue(createMessage(new Long(25)), null, null);
+ _queue.enqueue(createMessage(new Long(26)), null, null);
_queue.setNotificationListener(listener);
_queue.setAttributes(Collections.<String, Object>singletonMap(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES,
@@ -861,9 +861,9 @@ abstract class AbstractQueueTestBase extends QpidTestCase
public void testOldestMessage()
{
AMQQueue<?> queue = getQueue();
- queue.enqueue(createMessage(1l, (byte)1, Collections.singletonMap("sortKey", (Object) "Z"), 10l), null);
- queue.enqueue(createMessage(2l, (byte)4, Collections.singletonMap("sortKey", (Object) "M"), 100l), null);
- queue.enqueue(createMessage(3l, (byte)9, Collections.singletonMap("sortKey", (Object) "A"), 1000l), null);
+ queue.enqueue(createMessage(1l, (byte)1, Collections.singletonMap("sortKey", (Object) "Z"), 10l), null, null);
+ queue.enqueue(createMessage(2l, (byte)4, Collections.singletonMap("sortKey", (Object) "M"), 100l), null, null);
+ queue.enqueue(createMessage(3l, (byte)9, Collections.singletonMap("sortKey", (Object) "A"), 1000l), null, null);
assertEquals(10l,queue.getOldestMessageArrivalTime());
}
@@ -875,7 +875,7 @@ abstract class AbstractQueueTestBase extends QpidTestCase
ServerMessage message = createMessage(1l);
when(message.getArrivalTime()).thenReturn(arrivalTime);
when(message.getExpiration()).thenReturn(expiration);
- queue.enqueue(message,null);
+ queue.enqueue(message,null, null);
queue.visit(new QueueEntryVisitor()
{
@Override
@@ -940,7 +940,7 @@ abstract class AbstractQueueTestBase extends QpidTestCase
message = createMessage((long)i);
// Put message on queue
- queue.enqueue(message,null);
+ queue.enqueue(message,null, null);
}
try
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java
index faa2d3ffb8..35db9b7ae2 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java
@@ -86,7 +86,7 @@ public class LastValueQueueListTest extends TestCase
{
ServerMessage message = createTestServerMessage(null);
- _list.add(message);
+ _list.add(message, null);
int numberOfEntries = countEntries(_list);
assertEquals(1, numberOfEntries);
}
@@ -95,7 +95,7 @@ public class LastValueQueueListTest extends TestCase
{
ServerMessage message = createTestServerMessage(null);
- QueueEntry addedEntry = _list.add(message);
+ QueueEntry addedEntry = _list.add(message, null);
addedEntry.acquire();
addedEntry.delete();
@@ -107,7 +107,7 @@ public class LastValueQueueListTest extends TestCase
{
ServerMessage message = createTestServerMessage(TEST_KEY_VALUE);
- _list.add(message);
+ _list.add(message, null);
int numberOfEntries = countEntries(_list);
assertEquals(1, numberOfEntries);
}
@@ -116,7 +116,7 @@ public class LastValueQueueListTest extends TestCase
{
ServerMessage message = createTestServerMessage(TEST_KEY_VALUE);
- QueueEntry addedEntry = _list.add(message);
+ QueueEntry addedEntry = _list.add(message, null);
addedEntry.acquire();
addedEntry.delete();
@@ -129,8 +129,8 @@ public class LastValueQueueListTest extends TestCase
ServerMessage message1 = createTestServerMessage(TEST_KEY_VALUE1);
ServerMessage message2 = createTestServerMessage(TEST_KEY_VALUE2);
- _list.add(message1);
- _list.add(message2);
+ _list.add(message1, null);
+ _list.add(message2, null);
int numberOfEntries = countEntries(_list);
assertEquals(2, numberOfEntries);
@@ -141,8 +141,8 @@ public class LastValueQueueListTest extends TestCase
ServerMessage message1 = createTestServerMessage(TEST_KEY_VALUE);
ServerMessage message2 = createTestServerMessage(TEST_KEY_VALUE);
- _list.add(message1);
- _list.add(message2);
+ _list.add(message1, null);
+ _list.add(message2, null);
int numberOfEntries = countEntries(_list);
assertEquals(1, numberOfEntries);
@@ -153,10 +153,10 @@ public class LastValueQueueListTest extends TestCase
ServerMessage message1 = createTestServerMessage(TEST_KEY_VALUE);
ServerMessage message2 = createTestServerMessage(TEST_KEY_VALUE);
- QueueEntry entry1 = _list.add(message1);
+ QueueEntry entry1 = _list.add(message1, null);
entry1.acquire(); // simulate an in-progress delivery to consumer
- _list.add(message2);
+ _list.add(message2, null);
assertFalse(entry1.isDeleted());
assertEquals(2, countEntries(_list));
@@ -173,7 +173,7 @@ public class LastValueQueueListTest extends TestCase
ServerMessage message = createTestServerMessage(TEST_KEY_VALUE);
- QueueEntry addedEntry = _list.add(message);
+ QueueEntry addedEntry = _list.add(message, null);
assertEquals(1, countEntries(_list));
assertEquals(1, _list.getLatestValuesMap().size());
@@ -193,8 +193,8 @@ public class LastValueQueueListTest extends TestCase
ServerMessage message1 = createTestServerMessage(TEST_KEY_VALUE1);
ServerMessage message2 = createTestServerMessage(TEST_KEY_VALUE2);
- QueueEntry addedEntry1 = _list.add(message1);
- QueueEntry addedEntry2 = _list.add(message2);
+ QueueEntry addedEntry1 = _list.add(message1, null);
+ QueueEntry addedEntry2 = _list.add(message2, null);
assertEquals(2, countEntries(_list));
assertEquals(2, _list.getLatestValuesMap().size());
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueTest.java
index fc04af416f..1d8c9f1626 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueTest.java
@@ -43,11 +43,11 @@ public class LastValueQueueTest extends AbstractQueueTestBase
public void testOldestMessage()
{
AMQQueue<?> queue = getQueue();
- queue.enqueue(createMessage(1l, (byte)1, Collections.singletonMap("lvqKey", (Object) "Z"), 10l), null);
+ queue.enqueue(createMessage(1l, (byte)1, Collections.singletonMap("lvqKey", (Object) "Z"), 10l), null, null);
assertEquals(10l,queue.getOldestMessageArrivalTime());
- queue.enqueue(createMessage(2l, (byte)4, Collections.singletonMap("lvqKey", (Object) "M"), 100l), null);
+ queue.enqueue(createMessage(2l, (byte)4, Collections.singletonMap("lvqKey", (Object) "M"), 100l), null, null);
assertEquals(10l,queue.getOldestMessageArrivalTime());
- queue.enqueue(createMessage(3l, (byte)9, Collections.singletonMap("lvqKey", (Object) "Z"), 1000l), null);
+ queue.enqueue(createMessage(3l, (byte)9, Collections.singletonMap("lvqKey", (Object) "Z"), 1000l), null, null);
assertEquals(100l,queue.getOldestMessageArrivalTime());
}
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
index 08c0de95d5..b704a590e4 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
@@ -26,6 +26,7 @@ import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
@@ -66,6 +67,12 @@ public class MockMessageInstance implements MessageInstance
}
@Override
+ public MessageEnqueueRecord getEnqueueRecord()
+ {
+ return null;
+ }
+
+ @Override
public boolean isAcquiredBy(final ConsumerImpl consumer)
{
return false;
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java
index 49e333c364..ec5f912a87 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java
@@ -88,7 +88,7 @@ public class PriorityQueueListTest extends QpidTestCase
when(ref.getMessage()).thenReturn(message);
when(header.getPriority()).thenReturn(PRIORITIES[i]);
- entries[i] = _list.add(message);
+ entries[i] = _list.add(message, null);
}
_priority4message1 = entries[0];
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java
index fdc0411f95..57829664b7 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java
@@ -48,19 +48,19 @@ public class PriorityQueueTest extends AbstractQueueTestBase
// Enqueue messages in order
AbstractQueue queue = (AbstractQueue) getQueue();
- queue.enqueue(createMessage(1L, (byte) 10), null);
- queue.enqueue(createMessage(2L, (byte) 4), null);
- queue.enqueue(createMessage(3L, (byte) 0), null);
+ queue.enqueue(createMessage(1L, (byte) 10), null, null);
+ queue.enqueue(createMessage(2L, (byte) 4), null, null);
+ queue.enqueue(createMessage(3L, (byte) 0), null, null);
// Enqueue messages in reverse order
- queue.enqueue(createMessage(4L, (byte) 0), null);
- queue.enqueue(createMessage(5L, (byte) 4), null);
- queue.enqueue(createMessage(6L, (byte) 10), null);
+ queue.enqueue(createMessage(4L, (byte) 0), null, null);
+ queue.enqueue(createMessage(5L, (byte) 4), null, null);
+ queue.enqueue(createMessage(6L, (byte) 10), null, null);
// Enqueue messages out of order
- queue.enqueue(createMessage(7L, (byte) 4), null);
- queue.enqueue(createMessage(8L, (byte) 10), null);
- queue.enqueue(createMessage(9L, (byte) 0), null);
+ queue.enqueue(createMessage(7L, (byte) 4), null, null);
+ queue.enqueue(createMessage(8L, (byte) 10), null, null);
+ queue.enqueue(createMessage(9L, (byte) 0), null, null);
// Register subscriber
queue.addConsumer(getConsumer(), null, null, "test", EnumSet.noneOf(ConsumerImpl.Option.class));
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
index 2eb083bdb3..fe87a2acf1 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
@@ -263,7 +263,7 @@ public abstract class QueueEntryImplTestBase extends TestCase
when(reference.getMessage()).thenReturn(message);
when(message.newReference()).thenReturn(reference);
when(message.newReference(any(TransactionLogResource.class))).thenReturn(reference);
- QueueEntryImpl entry = (QueueEntryImpl) queueEntryList.add(message);
+ QueueEntryImpl entry = (QueueEntryImpl) queueEntryList.add(message, null);
entries[i] = entry;
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java
index 7f5ea06dcf..43d140f277 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java
@@ -56,7 +56,7 @@ public abstract class QueueEntryListTestBase extends TestCase
public void testAddSpecificMessage()
{
final QueueEntryList list = getTestList();
- list.add(getTestMessageToAdd());
+ list.add(getTestMessageToAdd(), null);
final QueueEntryIterator iter = list.iterator();
int count = 0;
@@ -77,7 +77,7 @@ public abstract class QueueEntryListTestBase extends TestCase
{
final QueueEntryList list = getTestList();
final ServerMessage message = createServerMessage(666l);
- list.add(message);
+ list.add(message, null);
final QueueEntryIterator iter = list.iterator();
int count = 0;
@@ -220,8 +220,8 @@ public abstract class QueueEntryListTestBase extends TestCase
QueueEntryList list = getTestList(true);
int i = 0;
- QueueEntry queueEntry1 = list.add(createServerMessage(i++));
- QueueEntry queueEntry2 = list.add(createServerMessage(i++));
+ QueueEntry queueEntry1 = list.add(createServerMessage(i++), null);
+ QueueEntry queueEntry2 = list.add(createServerMessage(i++), null);
assertSame(queueEntry2, list.next(queueEntry1));
assertNull(list.next(queueEntry2));
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java
index 79d7628a9c..1d731f5cfd 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java
@@ -28,11 +28,13 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.util.Action;
@@ -60,13 +62,13 @@ public class QueueMessageRecoveryTest extends QpidTestCase
queue.open();
- queue.recover(createMockMessage(0));
- queue.enqueue(createMockMessage(4), null);
- queue.enqueue(createMockMessage(5), null);
- queue.recover(createMockMessage(1));
- queue.recover(createMockMessage(2));
- queue.enqueue(createMockMessage(6), null);
- queue.recover(createMockMessage(3));
+ queue.recover(createMockMessage(0), createEnqueueRecord(0, queue));
+ queue.enqueue(createMockMessage(4), null, null);
+ queue.enqueue(createMockMessage(5), null, null);
+ queue.recover(createMockMessage(1), createEnqueueRecord(1, queue));
+ queue.recover(createMockMessage(2), createEnqueueRecord(2, queue));
+ queue.enqueue(createMockMessage(6), null, null);
+ queue.recover(createMockMessage(3), createEnqueueRecord(3, queue));
assertEquals(4, messageList.size());
for(int i = 0; i < 4; i++)
@@ -76,7 +78,7 @@ public class QueueMessageRecoveryTest extends QpidTestCase
queue.completeRecovery();
- queue.enqueue(createMockMessage(7), null);
+ queue.enqueue(createMockMessage(7), null, null);
assertEquals(8, messageList.size());
@@ -123,7 +125,7 @@ public class QueueMessageRecoveryTest extends QpidTestCase
{
for(int i = 0; i < size; i++)
{
- queue.recover(createMockMessage(i));
+ queue.recover(createMockMessage(i), createEnqueueRecord(i, queue));
}
queue.completeRecovery();
}
@@ -137,7 +139,7 @@ public class QueueMessageRecoveryTest extends QpidTestCase
{
for(int i = 0; i < size; i++)
{
- queue.enqueue(createMockMessage(size + i), null);
+ queue.enqueue(createMockMessage(size + i), null, null);
}
}
}, "publishing thread");
@@ -156,6 +158,24 @@ public class QueueMessageRecoveryTest extends QpidTestCase
}
}
+ private MessageEnqueueRecord createEnqueueRecord(final int messageNumber, final TestQueue queue)
+ {
+ return new MessageEnqueueRecord()
+ {
+ @Override
+ public UUID getQueueId()
+ {
+ return queue.getId();
+ }
+
+ @Override
+ public long getMessageNumber()
+ {
+ return messageNumber;
+ }
+ };
+ }
+
private ServerMessage createMockMessage(final long i)
{
@@ -189,7 +209,7 @@ public class QueueMessageRecoveryTest extends QpidTestCase
}
@Override
- protected void doEnqueue(final ServerMessage message, final Action<? super MessageInstance> action)
+ protected void doEnqueue(final ServerMessage message, final Action<? super MessageInstance> action, MessageEnqueueRecord record)
{
synchronized(_messageList)
{
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java
index ffd91ba093..95788757b5 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java
@@ -23,6 +23,7 @@ import org.junit.Assert;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.SortedQueueEntry.Colour;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
/**
* Test extension of SortedQueueEntryList that provides data structure validation tests.
@@ -42,10 +43,10 @@ public class SelfValidatingSortedQueueEntryList extends SortedQueueEntryList
}
@Override /** Overridden to automatically check queue properties before and after. */
- public SortedQueueEntry add(final ServerMessage message)
+ public SortedQueueEntry add(final ServerMessage message, final MessageEnqueueRecord enqueueRecord)
{
assertQueueProperties(); //before add
- final SortedQueueEntry result = super.add(message);
+ final SortedQueueEntry result = super.add(message, enqueueRecord);
assertQueueProperties(); //after add
return result;
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
index f7e9768083..a11b0acb24 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
@@ -76,7 +76,7 @@ public class SimpleQueueEntryImplTest extends QueueEntryImplTestBase
when(reference.getMessage()).thenReturn(message);
when(message.newReference()).thenReturn(reference);
when(message.newReference(any(TransactionLogResource.class))).thenReturn(reference);
- return (QueueEntryImpl) queueEntryList.add(message);
+ return (QueueEntryImpl) queueEntryList.add(message, null);
}
public void testCompareTo()
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java
index 8dbe22f5b6..06b0ef51b9 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java
@@ -129,7 +129,7 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase
for(final String key : keys)
{
final ServerMessage msg = generateTestMessage(messageId++, key);
- _sqel.add(msg);
+ _sqel.add(msg, null);
}
}
@@ -224,7 +224,7 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase
while(messageId < 200)
{
final ServerMessage msg = generateTestMessage(messageId++, "samekey");
- _sqel.add(msg);
+ _sqel.add(msg, null);
}
final QueueEntryIterator iter = getTestList().iterator();
@@ -245,7 +245,7 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase
while(messageId < 200)
{
final ServerMessage msg = generateTestMessage(messageId++, null);
- _sqel.add(msg);
+ _sqel.add(msg, null);
}
final QueueEntryIterator iter = getTestList().iterator();
@@ -267,7 +267,7 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase
{
final ServerMessage msg = generateTestMessage(messageId, textKey);
messageId++;
- _sqel.add(msg);
+ _sqel.add(msg, null);
}
final QueueEntryIterator iter = getTestList().iterator();
@@ -290,7 +290,7 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase
{
final ServerMessage msg = generateTestMessage(messageId, textkeys[i]);
messageId++;
- _sqel.add(msg);
+ _sqel.add(msg, null);
}
final QueueEntryIterator iter = getTestList().iterator();
@@ -308,13 +308,13 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase
_sqel = new SelfValidatingSortedQueueEntryList(_testQueue);
ServerMessage msg = generateTestMessage(1, "A");
- _sqel.add(msg);
+ _sqel.add(msg, null);
SortedQueueEntry entry = _sqel.next(_sqel.getHead());
validateEntry(entry, "A", 1);
msg = generateTestMessage(2, "B");
- _sqel.add(msg);
+ _sqel.add(msg, null);
entry = _sqel.next(_sqel.getHead());
validateEntry(entry, "A", 1);
@@ -328,13 +328,13 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase
_sqel = new SelfValidatingSortedQueueEntryList(_testQueue);
ServerMessage msg = generateTestMessage(1, "B");
- _sqel.add(msg);
+ _sqel.add(msg, null);
SortedQueueEntry entry = _sqel.next(_sqel.getHead());
validateEntry(entry, "B", 1);
msg = generateTestMessage(2, "A");
- _sqel.add(msg);
+ _sqel.add(msg, null);
entry = _sqel.next(_sqel.getHead());
validateEntry(entry, "A", 2);
@@ -348,12 +348,12 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase
_sqel = new SelfValidatingSortedQueueEntryList(_testQueue);
ServerMessage msg = generateTestMessage(1, "A");
- _sqel.add(msg);
+ _sqel.add(msg, null);
SortedQueueEntry entry = _sqel.next(_sqel.getHead());
validateEntry(entry, "A", 1);
msg = generateTestMessage(2, "C");
- _sqel.add(msg);
+ _sqel.add(msg, null);
entry = _sqel.next(_sqel.getHead());
validateEntry(entry, "A", 1);
@@ -362,7 +362,7 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase
validateEntry(entry, "C", 2);
msg = generateTestMessage(3, "B");
- _sqel.add(msg);
+ _sqel.add(msg, null);
entry = _sqel.next(_sqel.getHead());
validateEntry(entry, "A", 1);
@@ -379,13 +379,13 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase
_sqel = new SelfValidatingSortedQueueEntryList(_testQueue);
ServerMessage msg = generateTestMessage(1, "B");
- _sqel.add(msg);
+ _sqel.add(msg, null);
SortedQueueEntry entry = _sqel.next(_sqel.getHead());
validateEntry(entry, "B", 1);
msg = generateTestMessage(2, "D");
- _sqel.add(msg);
+ _sqel.add(msg, null);
entry = _sqel.next(_sqel.getHead());
validateEntry(entry, "B", 1);
@@ -394,7 +394,7 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase
validateEntry(entry, "D", 2);
msg = generateTestMessage(3, "C");
- _sqel.add(msg);
+ _sqel.add(msg, null);
entry = _sqel.next(_sqel.getHead());
validateEntry(entry, "B", 1);
@@ -406,7 +406,7 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase
validateEntry(entry, "D", 2);
msg = generateTestMessage(4, "A");
- _sqel.add(msg);
+ _sqel.add(msg, null);
entry = _sqel.next(_sqel.getHead());
validateEntry(entry, "A", 4);
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java
index 0b72af17bc..785b437375 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java
@@ -103,7 +103,7 @@ public class SortedQueueEntryTest extends QueueEntryImplTestBase
when(reference.getMessage()).thenReturn(message);
when(message.newReference()).thenReturn(reference);
when(message.newReference(any(TransactionLogResource.class))).thenReturn(reference);
- return _queueEntryList.add(message);
+ return _queueEntryList.add(message, null);
}
public void testCompareTo()
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java
index c592622877..444cdc09fa 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java
@@ -81,7 +81,7 @@ public class StandardQueueEntryListTest extends QueueEntryListTestBase
when(message.newReference()).thenReturn(ref);
when(message.newReference(any(TransactionLogResource.class))).thenReturn(ref);
- final QueueEntry bleh = _sqel.add(message);
+ final QueueEntry bleh = _sqel.add(message, null);
assertNotNull("QE should not have been null", bleh);
}
}
@@ -173,7 +173,7 @@ public class StandardQueueEntryListTest extends QueueEntryListTestBase
when(ref.getMessage()).thenReturn(message);
when(message.newReference()).thenReturn(ref);
when(message.newReference(any(TransactionLogResource.class))).thenReturn(ref);
- QueueEntry bleh = sqel.add(message);
+ QueueEntry bleh = sqel.add(message, null);
assertNotNull("QE should not have been null", bleh);
entriesMap.put(i,bleh);
}
@@ -276,7 +276,7 @@ public class StandardQueueEntryListTest extends QueueEntryListTestBase
when(reference.getMessage()).thenReturn(message);
when(message.newReference()).thenReturn(reference);
when(message.newReference(any(TransactionLogResource.class))).thenReturn(reference);
- entries[i] = (OrderedQueueEntry) queueEntryList.add(message);
+ entries[i] = (OrderedQueueEntry) queueEntryList.add(message, null);
}
// test getNext for not acquired entries
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
index 004096082a..268d11c9c5 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
@@ -36,6 +36,7 @@ import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
public class StandardQueueTest extends AbstractQueueTestBase
@@ -59,7 +60,7 @@ public class StandardQueueTest extends AbstractQueueTestBase
EnumSet.of(ConsumerImpl.Option.ACQUIRES,
ConsumerImpl.Option.SEES_REQUEUES));
- getQueue().enqueue(message, null);
+ getQueue().enqueue(message, null, null);
consumer.close();
assertTrue("Queue was not deleted when consumer was removed",
getQueue().isDeleted());
@@ -290,7 +291,8 @@ public class StandardQueueTest extends AbstractQueueTestBase
* Entries with even message id are considered
* dequeued!
*/
- protected DequeuedQueueEntry createQueueEntry(final ServerMessage message)
+ protected DequeuedQueueEntry createQueueEntry(final ServerMessage message,
+ final MessageEnqueueRecord enqueueRecord)
{
return new DequeuedQueueEntry(this, message);
}
@@ -311,7 +313,7 @@ public class StandardQueueTest extends AbstractQueueTestBase
public DequeuedQueueEntry(DequeuedQueueEntryList list, final ServerMessage message)
{
- super(list, message);
+ super(list, message, null);
_message = message;
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java
index f4b25665ba..0586f8dcce 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java
@@ -114,9 +114,10 @@ public abstract class MessageStoreQuotaEventsTestBase extends QpidTestCase imple
protected EnqueueableMessage addMessage(long id)
{
StorableMessageMetaData metaData = createMetaData(id, MESSAGE_DATA.length);
- StoredMessage<?> handle = _store.addMessage(metaData);
- handle.addContent(0, ByteBuffer.wrap(MESSAGE_DATA));
- TestMessage message = new TestMessage(id, handle);
+ MessageHandle<?> handle = _store.addMessage(metaData);
+ handle.addContent(ByteBuffer.wrap(MESSAGE_DATA));
+ StoredMessage<? extends StorableMessageMetaData> storedMessage = handle.allContentAdded();
+ TestMessage message = new TestMessage(id, storedMessage);
return message;
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
index 758799b81f..82c03c7545 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.store;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
@@ -40,7 +41,7 @@ import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.store.Transaction.Record;
+import org.apache.qpid.server.store.Transaction.EnqueueRecord;
import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
import org.apache.qpid.server.store.handler.MessageHandler;
import org.apache.qpid.server.store.handler.MessageInstanceHandler;
@@ -50,6 +51,7 @@ public abstract class MessageStoreTestCase extends QpidTestCase
{
private MessageStore _store;
private ConfiguredObject<?> _parent;
+ private MessageStore.MessageStoreReader _storeReader;
public void setUp() throws Exception
{
@@ -60,6 +62,7 @@ public abstract class MessageStoreTestCase extends QpidTestCase
_store = createMessageStore();
_store.openMessageStore(_parent);
+ _storeReader = _store.newMessageStoreReader();
}
protected abstract VirtualHost createVirtualHost();
@@ -73,52 +76,56 @@ public abstract class MessageStoreTestCase extends QpidTestCase
protected void reopenStore() throws Exception
{
+ _storeReader.close();
_store.closeMessageStore();
_store = createMessageStore();
_store.openMessageStore(_parent);
+ _storeReader = _store.newMessageStoreReader();
+
}
public void testAddAndRemoveRecordXid() throws Exception
{
long format = 1l;
- Record enqueueRecord = getTestRecord(1);
- Record dequeueRecord = getTestRecord(2);
- Record[] enqueues = { enqueueRecord };
- Record[] dequeues = { dequeueRecord };
+ EnqueueRecord enqueueRecord = getTestRecord(1);
+ TestRecord dequeueRecord = getTestRecord(2);
+ EnqueueRecord[] enqueues = { enqueueRecord };
+ TestRecord[] dequeues = { dequeueRecord };
byte[] globalId = new byte[] { 1 };
byte[] branchId = new byte[] { 2 };
Transaction transaction = _store.newTransaction();
- transaction.recordXid(format, globalId, branchId, enqueues, dequeues);
+ final Transaction.StoredXidRecord record =
+ transaction.recordXid(format, globalId, branchId, enqueues, dequeues);
transaction.commitTran();
reopenStore();
DistributedTransactionHandler handler = mock(DistributedTransactionHandler.class);
- _store.visitDistributedTransactions(handler);
- verify(handler, times(1)).handle(format,globalId, branchId, enqueues, dequeues);
+ _storeReader.visitDistributedTransactions(handler);
+ verify(handler, times(1)).handle(eq(record), argThat(new RecordMatcher(enqueues)), argThat(new DequeueRecordMatcher(dequeues)));
transaction = _store.newTransaction();
- transaction.removeXid(1l, globalId, branchId);
+ transaction.removeXid(record);
transaction.commitTran();
reopenStore();
handler = mock(DistributedTransactionHandler.class);
- _store.visitDistributedTransactions(handler);
- verify(handler, never()).handle(format,globalId, branchId, enqueues, dequeues);
+ _storeReader.visitDistributedTransactions(handler);
+ verify(handler, never()).handle(eq(record), argThat(new RecordMatcher(enqueues)), argThat(new DequeueRecordMatcher(dequeues)));
}
public void testVisitMessages() throws Exception
{
long messageId = 1;
int contentSize = 0;
- final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(messageId, contentSize));
- enqueueMessage(message, "dummyQ");
+ final MessageHandle<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(messageId, contentSize));
+ enqueueMessage(message.allContentAdded(), "dummyQ");
MessageHandler handler = mock(MessageHandler.class);
- _store.visitMessages(handler);
+ _storeReader.visitMessages(handler);
verify(handler, times(1)).handle(argThat(new MessageMetaDataMatcher(messageId)));
@@ -176,14 +183,14 @@ public abstract class MessageStoreTestCase extends QpidTestCase
int contentSize = 0;
for (int i = 0; i < 3; i++)
{
- final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(i + 1, contentSize));
- enqueueMessage(message, "dummyQ");
+ final MessageHandle<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(i + 1, contentSize));
+ enqueueMessage(message.allContentAdded(), "dummyQ");
}
MessageHandler handler = mock(MessageHandler.class);
when(handler.handle(any(StoredMessage.class))).thenReturn(true, false);
- _store.visitMessages(handler);
+ _storeReader.visitMessages(handler);
verify(handler, times(2)).handle(any(StoredMessage.class));
}
@@ -193,14 +200,14 @@ public abstract class MessageStoreTestCase extends QpidTestCase
int contentSize = 0;
for (int i = 0; i < 3; i++)
{
- final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(i + 1, contentSize));
+ final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(i + 1, contentSize)).allContentAdded();
enqueueMessage(message, "dummyQ");
}
reopenStore();
- final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(4, contentSize));
+ final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(4, contentSize)).allContentAdded();
enqueueMessage(message, "dummyQ");
@@ -212,7 +219,7 @@ public abstract class MessageStoreTestCase extends QpidTestCase
{
long messageId = 1;
int contentSize = 0;
- final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(messageId, contentSize));
+ final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(messageId, contentSize)).allContentAdded();
EnqueueableMessage enqueueableMessage = createMockEnqueueableMessage(messageId, message);
@@ -224,9 +231,8 @@ public abstract class MessageStoreTestCase extends QpidTestCase
transaction.commitTran();
MessageInstanceHandler handler = mock(MessageInstanceHandler.class);
- _store.visitMessageInstances(handler);
-
- verify(handler, times(1)).handle(queueId, messageId);
+ _storeReader.visitMessageInstances(handler);
+ verify(handler, times(1)).handle(argThat(new EnqueueRecordMatcher(queueId, messageId)));
}
public void testVisitDistributedTransactions() throws Exception
@@ -234,19 +240,22 @@ public abstract class MessageStoreTestCase extends QpidTestCase
long format = 1l;
byte[] branchId = new byte[] { 2 };
byte[] globalId = new byte[] { 1 };
- Record enqueueRecord = getTestRecord(1);
- Record dequeueRecord = getTestRecord(2);
- Record[] enqueues = { enqueueRecord };
- Record[] dequeues = { dequeueRecord };
+ EnqueueRecord enqueueRecord = getTestRecord(1);
+ TestRecord dequeueRecord = getTestRecord(2);
+ EnqueueRecord[] enqueues = { enqueueRecord };
+ TestRecord[] dequeues = { dequeueRecord };
Transaction transaction = _store.newTransaction();
- transaction.recordXid(format, globalId, branchId, enqueues, dequeues);
+ final Transaction.StoredXidRecord record =
+ transaction.recordXid(format, globalId, branchId, enqueues, dequeues);
transaction.commitTran();
DistributedTransactionHandler handler = mock(DistributedTransactionHandler.class);
- _store.visitDistributedTransactions(handler);
+ _storeReader.visitDistributedTransactions(handler);
- verify(handler, times(1)).handle(format,globalId, branchId, enqueues, dequeues);
+ verify(handler, times(1)).handle(eq(record),
+ argThat(new RecordMatcher(enqueues)),
+ argThat(new DequeueRecordMatcher(dequeues)));
}
@@ -267,7 +276,7 @@ public abstract class MessageStoreTestCase extends QpidTestCase
txn.commitTran();
QueueFilteringMessageInstanceHandler filter = new QueueFilteringMessageInstanceHandler(mockQueueId);
- getStore().visitMessageInstances(filter);
+ _storeReader.visitMessageInstances(filter);
Set<Long> enqueuedIds = filter.getEnqueuedIds();
assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size());
@@ -298,7 +307,7 @@ public abstract class MessageStoreTestCase extends QpidTestCase
txn.commitTran();
QueueFilteringMessageInstanceHandler filter = new QueueFilteringMessageInstanceHandler(mockQueueId);
- getStore().visitMessageInstances(filter);
+ _storeReader.visitMessageInstances(filter);
Set<Long> enqueuedIds = filter.getEnqueuedIds();
assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size());
@@ -333,7 +342,7 @@ public abstract class MessageStoreTestCase extends QpidTestCase
txn.commitTran();
QueueFilteringMessageInstanceHandler filter = new QueueFilteringMessageInstanceHandler(mockQueueId);
- getStore().visitMessageInstances(filter);
+ _storeReader.visitMessageInstances(filter);
Set<Long> enqueuedIds = filter.getEnqueuedIds();
assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size());
@@ -345,10 +354,10 @@ public abstract class MessageStoreTestCase extends QpidTestCase
{
long messageId = 1;
int contentSize = 0;
- final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(messageId, contentSize, false));
+ final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(messageId, contentSize, false)).allContentAdded();
MessageHandler handler = mock(MessageHandler.class);
- _store.visitMessages(handler);
+ _storeReader.visitMessages(handler);
verify(handler, times(0)).handle(argThat(new MessageMetaDataMatcher(messageId)));
}
@@ -357,11 +366,11 @@ public abstract class MessageStoreTestCase extends QpidTestCase
{
long messageId = 1;
int contentSize = 0;
- final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(messageId, contentSize));
+ final StoredMessage<TestMessageMetaData> message = _store.addMessage(new TestMessageMetaData(messageId, contentSize)).allContentAdded();
enqueueMessage(message, "dummyQ");
final AtomicReference<StoredMessage<?>> retrievedMessageRef = new AtomicReference<StoredMessage<?>>();
- _store.visitMessages(new MessageHandler()
+ _storeReader.visitMessages(new MessageHandler()
{
@Override
@@ -379,7 +388,7 @@ public abstract class MessageStoreTestCase extends QpidTestCase
retrievedMessage.remove();
retrievedMessageRef.set(null);
- _store.visitMessages(new MessageHandler()
+ _storeReader.visitMessages(new MessageHandler()
{
@Override
@@ -411,7 +420,7 @@ public abstract class MessageStoreTestCase extends QpidTestCase
return enqueueableMessage;
}
- private Record getTestRecord(long messageNumber)
+ private TestRecord getTestRecord(long messageNumber)
{
UUID queueId1 = UUIDGenerator.generateRandomUUID();
TransactionLogResource queue1 = mock(TransactionLogResource.class);
@@ -422,13 +431,13 @@ public abstract class MessageStoreTestCase extends QpidTestCase
final StoredMessage<?> storedMessage = mock(StoredMessage.class);
when(storedMessage.getMessageNumber()).thenReturn(messageNumber);
when(message1.getStoredMessage()).thenReturn(storedMessage);
- Record enqueueRecord = new TestRecord(queue1, message1);
+ TestRecord enqueueRecord = new TestRecord(queue1, message1);
return enqueueRecord;
}
private EnqueueableMessage createEnqueueableMessage(long messageId1)
{
- final StoredMessage<TestMessageMetaData> message1 = _store.addMessage(new TestMessageMetaData(messageId1, 0));
+ final StoredMessage<TestMessageMetaData> message1 = _store.addMessage(new TestMessageMetaData(messageId1, 0)).allContentAdded();
EnqueueableMessage enqueueableMessage1 = createMockEnqueueableMessage(messageId1, message1);
return enqueueableMessage1;
}
@@ -468,9 +477,10 @@ public abstract class MessageStoreTestCase extends QpidTestCase
}
@Override
- public boolean handle(UUID queueId, long messageId)
+ public boolean handle(final MessageEnqueueRecord record)
{
- if (queueId.equals(_queueId))
+ long messageId = record.getMessageNumber();
+ if (record.getQueueId().equals(_queueId))
{
if (_enqueuedIds.contains(messageId))
{
@@ -487,4 +497,103 @@ public abstract class MessageStoreTestCase extends QpidTestCase
}
}
+ private class EnqueueRecordMatcher extends ArgumentMatcher<MessageEnqueueRecord>
+ {
+ private final UUID _queueId;
+ private final long _messageId;
+
+ public EnqueueRecordMatcher(final UUID queueId, final long messageId)
+ {
+ _queueId = queueId;
+ _messageId = messageId;
+ }
+
+ @Override
+ public boolean matches(final Object argument)
+ {
+ if(argument instanceof MessageEnqueueRecord)
+ {
+ MessageEnqueueRecord record = (MessageEnqueueRecord)argument;
+ return record.getQueueId().equals(_queueId) && record.getMessageNumber() == _messageId;
+ }
+ return false;
+ }
+ }
+
+
+ private class RecordMatcher extends ArgumentMatcher<Transaction.EnqueueRecord[]>
+ {
+
+ private final EnqueueRecord[] _expect;
+
+ public RecordMatcher(Transaction.EnqueueRecord[] expect)
+ {
+ _expect = expect;
+ }
+
+ @Override
+ public boolean matches(final Object argument)
+ {
+ if(argument.getClass().isArray() && Transaction.EnqueueRecord.class.isAssignableFrom(argument.getClass().getComponentType()))
+ {
+ Transaction.EnqueueRecord[] actual = (Transaction.EnqueueRecord[]) argument;
+ if(actual.length == _expect.length)
+ {
+ for(int i = 0; i < actual.length; i++)
+ {
+ if(!actual[i].getResource().getId().equals(_expect[i].getResource().getId())
+ || actual[i].getMessage().getMessageNumber() != _expect[i].getMessage().getMessageNumber())
+ {
+ return false;
+ }
+ }
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+
+ }
+ return false;
+ }
+ }
+
+ private class DequeueRecordMatcher extends ArgumentMatcher<Transaction.DequeueRecord[]>
+ {
+
+ private final Transaction.DequeueRecord[] _expect;
+
+ public DequeueRecordMatcher(Transaction.DequeueRecord[] expect)
+ {
+ _expect = expect;
+ }
+
+ @Override
+ public boolean matches(final Object argument)
+ {
+ if(argument.getClass().isArray() && Transaction.DequeueRecord.class.isAssignableFrom(argument.getClass().getComponentType()))
+ {
+ Transaction.DequeueRecord[] actual = (Transaction.DequeueRecord[]) argument;
+ if(actual.length == _expect.length)
+ {
+ for(int i = 0; i < actual.length; i++)
+ {
+ if(!actual[i].getEnqueueRecord().getQueueId().equals(_expect[i].getEnqueueRecord().getQueueId())
+ || actual[i].getEnqueueRecord().getMessageNumber() != _expect[i].getEnqueueRecord().getMessageNumber())
+ {
+ return false;
+ }
+ }
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+
+ }
+ return false;
+ }
+ }
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java
index 12b21fa964..eb12b2e320 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java
@@ -35,7 +35,7 @@ public class TestMemoryMessageStore extends MemoryMessageStore
public int getMessageCount()
{
final AtomicInteger counter = new AtomicInteger();
- visitMessages(new MessageHandler()
+ newMessageStoreReader().visitMessages(new MessageHandler()
{
@Override
public boolean handle(StoredMessage<?> storedMessage)
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestRecord.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestRecord.java
index 668d9d5242..0ede362c0d 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestRecord.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestRecord.java
@@ -20,10 +20,12 @@
*/
package org.apache.qpid.server.store;
+import java.util.UUID;
+
import org.apache.qpid.server.message.EnqueueableMessage;
-import org.apache.qpid.server.store.Transaction.Record;
+import org.apache.qpid.server.store.Transaction.EnqueueRecord;
-public class TestRecord implements Record
+public class TestRecord implements EnqueueRecord, Transaction.DequeueRecord, MessageEnqueueRecord
{
private TransactionLogResource _queue;
private EnqueueableMessage _message;
@@ -68,11 +70,11 @@ public class TestRecord implements Record
{
return false;
}
- if (!(obj instanceof Record))
+ if (!(obj instanceof EnqueueRecord))
{
return false;
}
- Record other = (Record) obj;
+ EnqueueRecord other = (EnqueueRecord) obj;
if (_message == null && other.getMessage() != null)
{
return false;
@@ -88,4 +90,21 @@ public class TestRecord implements Record
return _queue.getId().equals(other.getResource().getId());
}
-} \ No newline at end of file
+ @Override
+ public MessageEnqueueRecord getEnqueueRecord()
+ {
+ return this;
+ }
+
+ @Override
+ public UUID getQueueId()
+ {
+ return _queue.getId();
+ }
+
+ @Override
+ public long getMessageNumber()
+ {
+ return _message.getMessageNumber();
+ }
+}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java
index a61ac4f5d2..e13a972cf9 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java
@@ -26,6 +26,7 @@ import java.util.Collections;
import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.store.MessageDurability;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.util.FutureResult;
import org.apache.qpid.server.store.Transaction;
@@ -42,7 +43,7 @@ public class AsyncAutoCommitTransactionTest extends QpidTestCase
private BaseQueue _queue = mock(BaseQueue.class);
private MessageStore _messageStore = mock(MessageStore.class);
private Transaction _storeTransaction = mock(Transaction.class);
- private Action _postTransactionAction = mock(Action.class);
+ private ServerTransaction.EnqueueAction _postTransactionAction = mock(ServerTransaction.EnqueueAction.class);
private FutureResult _future = mock(FutureResult.class);
@@ -70,7 +71,7 @@ public class AsyncAutoCommitTransactionTest extends QpidTestCase
asyncAutoCommitTransaction.enqueue(_queue, _message, _postTransactionAction);
verify(_storeTransaction).enqueueMessage(_queue, _message);
- verify(_futureRecorder).recordFuture(_future, _postTransactionAction);
+ verify(_futureRecorder).recordFuture(eq(_future), any(Action.class));
verifyZeroInteractions(_postTransactionAction);
}
@@ -87,7 +88,7 @@ public class AsyncAutoCommitTransactionTest extends QpidTestCase
asyncAutoCommitTransaction.enqueue(Collections.singletonList(_queue), _message, _postTransactionAction);
verify(_storeTransaction).enqueueMessage(_queue, _message);
- verify(_futureRecorder).recordFuture(_future, _postTransactionAction);
+ verify(_futureRecorder).recordFuture(eq(_future), any(Action.class));
verifyZeroInteractions(_postTransactionAction);
}
@@ -104,7 +105,7 @@ public class AsyncAutoCommitTransactionTest extends QpidTestCase
asyncAutoCommitTransaction.enqueue(_queue, _message, _postTransactionAction);
verify(_storeTransaction).enqueueMessage(_queue, _message);
- verify(_futureRecorder).recordFuture(_future, _postTransactionAction);
+ verify(_futureRecorder).recordFuture(eq(_future), any(Action.class));
verifyZeroInteractions(_postTransactionAction);
}
@@ -120,7 +121,7 @@ public class AsyncAutoCommitTransactionTest extends QpidTestCase
asyncAutoCommitTransaction.enqueue(_queue, _message, _postTransactionAction);
verifyZeroInteractions(_storeTransaction);
- verify(_postTransactionAction).postCommit();
+ verify(_postTransactionAction).postCommit((MessageEnqueueRecord)null);
verifyZeroInteractions(_futureRecorder);
}
@@ -136,7 +137,7 @@ public class AsyncAutoCommitTransactionTest extends QpidTestCase
asyncAutoCommitTransaction.enqueue(_queue, _message, _postTransactionAction);
verifyZeroInteractions(_storeTransaction);
- verify(_futureRecorder).recordFuture(FutureResult.IMMEDIATE_FUTURE, _postTransactionAction);
+ verify(_futureRecorder).recordFuture(eq(FutureResult.IMMEDIATE_FUTURE), any(Action.class));
verifyZeroInteractions(_postTransactionAction);
}
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java
index 5abbd7352b..49ececb00f 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java
@@ -32,6 +32,7 @@ import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.MockMessageInstance;
import org.apache.qpid.server.store.MessageDurability;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.txn.MockStoreTransaction.TransactionState;
@@ -225,7 +226,7 @@ public class AutoCommitTransactionTest extends QpidTestCase
_message = createTestMessage(false);
_queue = createTestAMQQueue(false);
- _transaction.dequeue(_queue, _message, _action);
+ _transaction.dequeue((MessageEnqueueRecord)null, _action);
assertEquals("Dequeue of non-persistent message must not cause message to be dequeued", 0, _storeTransaction.getNumberOfDequeuedMessages());
assertEquals("Unexpected transaction state", TransactionState.NOT_STARTED, _storeTransaction.getState());
@@ -234,23 +235,6 @@ public class AutoCommitTransactionTest extends QpidTestCase
}
- /**
- * Tests the dequeue of a persistent message from a single non durable queue.
- * Asserts that a store transaction has not been started and post commit
- * action fired.
- */
- public void testDequeueFromDurableQueueOfPersistentMessage() throws Exception
- {
- _message = createTestMessage(true);
- _queue = createTestAMQQueue(true);
-
- _transaction.dequeue(_queue, _message, _action);
-
- assertEquals("Dequeue of persistent message to durable queue must cause message to be dequeued",1, _storeTransaction.getNumberOfDequeuedMessages());
- assertEquals("Unexpected transaction state", TransactionState.COMMITTED, _storeTransaction.getState());
- assertFalse("Rollback action must not be fired", _action.isRollbackActionFired());
- assertTrue("Post commit action must be fired", _action.isPostCommitActionFired());
- }
/**
* Tests the case where the store operation throws an exception.
@@ -268,7 +252,7 @@ public class AutoCommitTransactionTest extends QpidTestCase
try
{
- _transaction.dequeue(_queue, _message, _action);
+ _transaction.dequeue(mock(MessageEnqueueRecord.class), _action);
fail("Exception not thrown");
}
catch (RuntimeException re)
@@ -387,7 +371,7 @@ public class AutoCommitTransactionTest extends QpidTestCase
{
final BaseQueue queue = createTestAMQQueue(queueDurableFlags[i]);
final ServerMessage message = createTestMessage(messagePersistentFlags[i]);
-
+ final boolean hasRecord = queueDurableFlags[i] && messagePersistentFlags[i];
queueEntries.add(new MockMessageInstance()
{
@@ -402,7 +386,19 @@ public class AutoCommitTransactionTest extends QpidTestCase
{
return queue;
}
-
+
+ @Override
+ public MessageEnqueueRecord getEnqueueRecord()
+ {
+ if(hasRecord)
+ {
+ return mock(MessageEnqueueRecord.class);
+ }
+ else
+ {
+ return null;
+ }
+ }
});
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java
index c905e52715..00ea9d6b01 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java
@@ -32,6 +32,7 @@ import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.MockMessageInstance;
import org.apache.qpid.server.store.MessageDurability;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.txn.MockStoreTransaction.TransactionState;
@@ -222,7 +223,7 @@ public class LocalTransactionTest extends QpidTestCase
_message = createTestMessage(false);
_queue = createQueue(false);
- _transaction.dequeue(_queue, _message, _action1);
+ _transaction.dequeue((MessageEnqueueRecord)null, _action1);
assertEquals("Dequeue of non-persistent message must not cause message to be enqueued", 0, _storeTransaction.getNumberOfEnqueuedMessages());
assertEquals("Unexpected transaction state", TransactionState.NOT_STARTED, _storeTransaction.getState());
@@ -239,7 +240,7 @@ public class LocalTransactionTest extends QpidTestCase
_message = createTestMessage(true);
_queue = createQueue(true);
- _transaction.dequeue(_queue, _message, _action1);
+ _transaction.dequeue(mock(MessageEnqueueRecord.class), _action1);
assertEquals("Dequeue of non-persistent message must cause message to be dequeued", 1, _storeTransaction.getNumberOfDequeuedMessages());
assertEquals("Unexpected transaction state", TransactionState.STARTED, _storeTransaction.getState());
@@ -261,7 +262,7 @@ public class LocalTransactionTest extends QpidTestCase
try
{
- _transaction.dequeue(_queue, _message, _action1);
+ _transaction.dequeue(mock(MessageEnqueueRecord.class), _action1);
fail("Exception not thrown");
}
catch (RuntimeException re)
@@ -404,7 +405,7 @@ public class LocalTransactionTest extends QpidTestCase
assertEquals("Unexpected transaction state", TransactionState.NOT_STARTED, _storeTransaction.getState());
assertFalse("Post commit action must not be fired yet", _action1.isPostCommitActionFired());
- _transaction.dequeue(_queue, _message, _action1);
+ _transaction.dequeue(mock(MessageEnqueueRecord.class), _action1);
assertEquals("Unexpected transaction state", TransactionState.STARTED, _storeTransaction.getState());
assertFalse("Post commit action must not be fired yet", _action1.isPostCommitActionFired());
@@ -428,7 +429,7 @@ public class LocalTransactionTest extends QpidTestCase
assertEquals("Unexpected transaction state", TransactionState.NOT_STARTED, _storeTransaction.getState());
assertFalse("Rollback action must not be fired yet", _action1.isRollbackActionFired());
- _transaction.dequeue(_queue, _message, _action1);
+ _transaction.dequeue(mock(MessageEnqueueRecord.class), _action1);
assertEquals("Unexpected transaction state", TransactionState.STARTED, _storeTransaction.getState());
assertFalse("Rollback action must not be fired yet", _action1.isRollbackActionFired());
@@ -451,7 +452,7 @@ public class LocalTransactionTest extends QpidTestCase
_queue = createQueue(true);
_transaction.addPostTransactionAction(_action1);
- _transaction.dequeue(_queue, _message, _action2);
+ _transaction.dequeue(mock(MessageEnqueueRecord.class), _action2);
_transaction.commit();
assertEquals("Unexpected transaction state", TransactionState.COMMITTED, _storeTransaction.getState());
@@ -473,7 +474,7 @@ public class LocalTransactionTest extends QpidTestCase
_queue = createQueue(true);
_transaction.addPostTransactionAction(_action1);
- _transaction.dequeue(_queue, _message, _action2);
+ _transaction.dequeue(mock(MessageEnqueueRecord.class), _action2);
_transaction.rollback();
assertEquals("Unexpected transaction state", TransactionState.ABORTED, _storeTransaction.getState());
@@ -532,7 +533,7 @@ public class LocalTransactionTest extends QpidTestCase
_queue = createQueue(true);
long startTime = System.currentTimeMillis();
- _transaction.dequeue(_queue, _message, _action1);
+ _transaction.dequeue(mock(MessageEnqueueRecord.class), _action1);
assertTrue("Transaction start time should have been recorded", _transaction.getTransactionStartTime() >= startTime);
assertEquals("Transaction update time should be the same as transaction start time", _transaction.getTransactionStartTime(), _transaction.getTransactionUpdateTime());
@@ -552,7 +553,7 @@ public class LocalTransactionTest extends QpidTestCase
final long transactionUpdateTimeAfterFirstEnqueue = _transaction.getTransactionUpdateTime();
Thread.sleep(1);
- _transaction.dequeue(_queue, _message, _action2);
+ _transaction.dequeue(mock(MessageEnqueueRecord.class), _action2);
final long transactionStartTimeAfterFirstDequeue = _transaction.getTransactionStartTime();
final long transactionUpdateTimeAfterFirstDequeue = _transaction.getTransactionUpdateTime();
@@ -611,7 +612,7 @@ public class LocalTransactionTest extends QpidTestCase
{
final TransactionLogResource queue = createQueue(queueDurableFlags[i]);
final ServerMessage message = createTestMessage(messagePersistentFlags[i]);
-
+ final boolean hasRecord = queueDurableFlags[i] && messagePersistentFlags[i];
queueEntries.add(new MockMessageInstance()
{
@@ -626,7 +627,12 @@ public class LocalTransactionTest extends QpidTestCase
{
return queue;
}
-
+
+ @Override
+ public MessageEnqueueRecord getEnqueueRecord()
+ {
+ return hasRecord ? mock(MessageEnqueueRecord.class) : null;
+ }
});
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockAction.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockAction.java
index 8fba28e200..9be896f0c4 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockAction.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockAction.java
@@ -20,18 +20,24 @@
*/
package org.apache.qpid.server.txn;
-import org.apache.qpid.server.txn.ServerTransaction.Action;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
/**
* Mock implementation of a ServerTransaction Action
* allowing its state to be observed.
*
*/
-class MockAction implements Action
+class MockAction implements ServerTransaction.EnqueueAction, ServerTransaction.Action
{
private boolean _rollbackFired = false;
private boolean _postCommitFired = false;
+ public void postCommit(MessageEnqueueRecord... records)
+ {
+ _postCommitFired = true;
+ }
+
+ @Override
public void postCommit()
{
_postCommitFired = true;
@@ -51,4 +57,4 @@ class MockAction implements Action
{
return _postCommitFired;
}
-} \ No newline at end of file
+}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java
index 6fcfde0221..8aac7861ae 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java
@@ -20,8 +20,11 @@
*/
package org.apache.qpid.server.txn;
+import java.util.UUID;
+
import org.apache.commons.lang.NotImplementedException;
import org.apache.qpid.server.message.EnqueueableMessage;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.NullMessageStore;
import org.apache.qpid.server.util.FutureResult;
@@ -60,7 +63,7 @@ class MockStoreTransaction implements Transaction
return _state;
}
- public void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message)
+ public MessageEnqueueRecord enqueueMessage(TransactionLogResource queue, EnqueueableMessage message)
{
if (_throwExceptionOnQueueOp)
{
@@ -69,6 +72,7 @@ class MockStoreTransaction implements Transaction
}
_numberOfEnqueuedMessages++;
+ return new MockEnqueueRecord(queue.getId(), message.getMessageNumber());
}
public int getNumberOfDequeuedMessages()
@@ -81,7 +85,8 @@ class MockStoreTransaction implements Transaction
return _numberOfEnqueuedMessages;
}
- public void dequeueMessage(TransactionLogResource queue, EnqueueableMessage message)
+ @Override
+ public void dequeueMessage(final MessageEnqueueRecord enqueueRecord)
{
if (_throwExceptionOnQueueOp)
{
@@ -110,8 +115,19 @@ class MockStoreTransaction implements Transaction
{
}
- public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues)
+ @Override
+ public void removeXid(final StoredXidRecord record)
+ {
+
+ }
+
+ public StoredXidRecord recordXid(long format,
+ byte[] globalId,
+ byte[] branchId,
+ EnqueueRecord[] enqueues,
+ DequeueRecord[] dequeues)
{
+ return null;
}
public static MessageStore createTestTransactionLog(final MockStoreTransaction storeTransaction)
@@ -126,4 +142,27 @@ class MockStoreTransaction implements Transaction
}
};
}
+
+ private static class MockEnqueueRecord implements MessageEnqueueRecord
+ {
+ private final UUID _queueId;
+ private final long _messageNumber;
+
+ public MockEnqueueRecord(final UUID queueId,
+ final long messageNumber)
+ {
+ _queueId = queueId;
+ _messageNumber = messageNumber;
+ }
+
+ public UUID getQueueId()
+ {
+ return _queueId;
+ }
+
+ public long getMessageNumber()
+ {
+ return _messageNumber;
+ }
+ }
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AbstractVirtualHostTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AbstractVirtualHostTest.java
index 47fd5a7398..b4eddc5033 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AbstractVirtualHostTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AbstractVirtualHostTest.java
@@ -161,6 +161,8 @@ public class AbstractVirtualHostTest extends QpidTestCase
{
Map<String,Object> attributes = Collections.<String, Object>singletonMap(AbstractVirtualHost.NAME, getTestName());
final MessageStore store = mock(MessageStore.class);
+ when(store.newMessageStoreReader()).thenReturn(mock(MessageStore.MessageStoreReader.class));
+
AbstractVirtualHost host = new AbstractVirtualHost(attributes, _node)
{
@Override
@@ -200,7 +202,8 @@ public class AbstractVirtualHostTest extends QpidTestCase
public void testActivateInErrorStateAfterOpen() throws Exception
{
- Map<String,Object> attributes = Collections.<String, Object>singletonMap(AbstractVirtualHost.NAME, getTestName());
+ Map<String,Object> attributes = Collections.<String, Object>singletonMap(AbstractVirtualHost.NAME,
+ getTestName());
final MessageStore store = mock(MessageStore.class);
doThrow(new RuntimeException("Cannot open store")).when(store).openMessageStore(any(ConfiguredObject.class));
AbstractVirtualHost host = new AbstractVirtualHost(attributes, _node)
@@ -216,6 +219,7 @@ public class AbstractVirtualHostTest extends QpidTestCase
assertEquals("Unexpected state", State.ERRORED, host.getState());
doNothing().when(store).openMessageStore(any(ConfiguredObject.class));
+ when(store.newMessageStoreReader()).thenReturn(mock(MessageStore.MessageStoreReader.class));
host.setAttributes(Collections.<String, Object>singletonMap(VirtualHost.DESIRED_STATE, State.ACTIVE));
assertEquals("Unexpected state", State.ACTIVE, host.getState());
@@ -239,6 +243,7 @@ public class AbstractVirtualHostTest extends QpidTestCase
assertEquals("Unexpected state", State.ERRORED, host.getState());
doNothing().when(store).openMessageStore(any(ConfiguredObject.class));
+ when(store.newMessageStoreReader()).thenReturn(mock(MessageStore.MessageStoreReader.class));
host.start();
assertEquals("Unexpected state", State.ACTIVE, host.getState());
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java
index 9c05ed564a..00089ea130 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java
@@ -24,7 +24,6 @@ import static org.mockito.Matchers.any;
import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isNull;
-import static org.mockito.Matchers.same;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
@@ -43,6 +42,7 @@ import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.MessageDurability;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.NullMessageStore;
import org.apache.qpid.server.store.StorableMessageMetaData;
@@ -50,7 +50,7 @@ import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TestMessageMetaData;
import org.apache.qpid.server.store.Transaction;
-import org.apache.qpid.server.store.Transaction.Record;
+import org.apache.qpid.server.store.Transaction.EnqueueRecord;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
import org.apache.qpid.server.store.handler.MessageHandler;
@@ -93,7 +93,7 @@ public class SynchronousMessageStoreRecovererTest extends TestCase
@Override
public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException
{
- handler.handle(queue.getId(), messageId);
+ handler.handle(new TestMessageEnqueueRecord(queue.getId(), messageId));
}
};
@@ -104,7 +104,7 @@ public class SynchronousMessageStoreRecovererTest extends TestCase
recoverer.recover(_virtualHost);
ServerMessage<?> message = storedMessage.getMetaData().getType().createMessage(storedMessage);
- verify(queue, times(1)).recover(eq(message));
+ verify(queue, times(1)).recover(eq(message), any(MessageEnqueueRecord.class));
}
@SuppressWarnings("unchecked")
@@ -126,7 +126,7 @@ public class SynchronousMessageStoreRecovererTest extends TestCase
@Override
public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException
{
- handler.handle(queue.getId(), messageId);
+ handler.handle(new TestMessageEnqueueRecord(queue.getId(), messageId));
}
@Override
@@ -142,8 +142,8 @@ public class SynchronousMessageStoreRecovererTest extends TestCase
recoverer = new SynchronousMessageStoreRecoverer();
recoverer.recover(_virtualHost);
- verify(queue, never()).enqueue(any(ServerMessage.class), any(Action.class));
- verify(transaction).dequeueMessage(same(queue), argThat(new MessageNumberMatcher(messageId)));
+ verify(queue, never()).enqueue(any(ServerMessage.class), any(Action.class), any(MessageEnqueueRecord.class));
+ verify(transaction).dequeueMessage(argThat(new MessageEnqueueRecordMatcher(queue.getId(), messageId)));
verify(transaction, times(1)).commitTranAsync();
}
@@ -165,7 +165,7 @@ public class SynchronousMessageStoreRecovererTest extends TestCase
@Override
public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException
{
- handler.handle(queueId, messageId);
+ handler.handle(new TestMessageEnqueueRecord(queueId, messageId));
}
@Override
@@ -181,7 +181,7 @@ public class SynchronousMessageStoreRecovererTest extends TestCase
recoverer = new SynchronousMessageStoreRecoverer();
recoverer.recover(_virtualHost);
- verify(transaction).dequeueMessage(argThat(new QueueIdMatcher(queueId)), argThat(new MessageNumberMatcher(messageId)));
+ verify(transaction).dequeueMessage(argThat(new MessageEnqueueRecordMatcher(queueId,messageId)));
verify(transaction, times(1)).commitTranAsync();
}
@@ -226,13 +226,13 @@ public class SynchronousMessageStoreRecovererTest extends TestCase
long messageId = storedMessage.getMessageNumber();
EnqueueableMessage enqueueableMessage = createMockEnqueueableMessage(messageId, storedMessage);
- Record enqueueRecord = createMockRecord(queue, enqueueableMessage);
+ EnqueueRecord enqueueRecord = createMockRecord(queue, enqueueableMessage);
final long format = 1;
final byte[] globalId = new byte[] {0};
final byte[] branchId = new byte[] {0};
- final Record[] enqueues = { enqueueRecord };
- final Record[] dequeues = {};
+ final EnqueueRecord[] enqueues = { enqueueRecord };
+ final Transaction.DequeueRecord[] dequeues = {};
MessageStore store = new NullMessageStore()
{
@@ -251,7 +251,26 @@ public class SynchronousMessageStoreRecovererTest extends TestCase
@Override
public void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException
{
- handler.handle(format, globalId, branchId, enqueues, dequeues);
+ handler.handle(new Transaction.StoredXidRecord()
+ {
+ @Override
+ public long getFormat()
+ {
+ return format;
+ }
+
+ @Override
+ public byte[] getGlobalId()
+ {
+ return globalId;
+ }
+
+ @Override
+ public byte[] getBranchId()
+ {
+ return branchId;
+ }
+ }, enqueues, dequeues);
}
@Override
@@ -275,22 +294,21 @@ public class SynchronousMessageStoreRecovererTest extends TestCase
branch.commit();
ServerMessage<?> message = storedMessage.getMetaData().getType().createMessage(storedMessage);
- verify(queue, times(1)).enqueue(eq(message), (Action<? super MessageInstance>)isNull());
+ verify(queue, times(1)).enqueue(eq(message), (Action<? super MessageInstance>)isNull(), any(MessageEnqueueRecord.class));
verify(transaction).commitTran();
}
public void testRecoveryOfSingleDequeueWithDistributedTransaction()
{
- final AMQQueue<?> queue = createRegisteredMockQueue();
+ final UUID queueId = UUID.randomUUID();
+ final AMQQueue<?> queue = createRegisteredMockQueue(queueId);
final Transaction transaction = mock(Transaction.class);
-
final StoredMessage<StorableMessageMetaData> storedMessage = createMockStoredMessage(1);
final long messageId = storedMessage.getMessageNumber();
- EnqueueableMessage enqueueableMessage = createMockEnqueueableMessage(messageId, storedMessage);
- Record dequeueRecord = createMockRecord(queue, enqueueableMessage);
+ Transaction.DequeueRecord dequeueRecord = createMockDequeueRecord(queueId, messageId);
QueueEntry queueEntry = mock(QueueEntry.class);
when(queue.getMessageOnTheQueue(messageId)).thenReturn(queueEntry);
@@ -298,8 +316,8 @@ public class SynchronousMessageStoreRecovererTest extends TestCase
final long format = 1;
final byte[] globalId = new byte[] {0};
final byte[] branchId = new byte[] {0};
- final Record[] enqueues = {};
- final Record[] dequeues = { dequeueRecord };
+ final EnqueueRecord[] enqueues = {};
+ final Transaction.DequeueRecord[] dequeues = { dequeueRecord };
MessageStore store = new NullMessageStore()
{
@@ -314,13 +332,32 @@ public class SynchronousMessageStoreRecovererTest extends TestCase
{
// We need the message to be enqueued onto the queue so that later the distributed transaction
// can dequeue it.
- handler.handle(queue.getId(), messageId);
+ handler.handle(new TestMessageEnqueueRecord(queue.getId(), messageId));
}
@Override
public void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException
{
- handler.handle(format, globalId, branchId, enqueues, dequeues);
+ handler.handle(new Transaction.StoredXidRecord()
+ {
+ @Override
+ public long getFormat()
+ {
+ return format;
+ }
+
+ @Override
+ public byte[] getGlobalId()
+ {
+ return globalId;
+ }
+
+ @Override
+ public byte[] getBranchId()
+ {
+ return branchId;
+ }
+ }, enqueues, dequeues);
}
@Override
@@ -348,14 +385,25 @@ public class SynchronousMessageStoreRecovererTest extends TestCase
}
- protected Record createMockRecord(AMQQueue<?> queue, EnqueueableMessage enqueueableMessage)
+ protected EnqueueRecord createMockRecord(AMQQueue<?> queue, EnqueueableMessage enqueueableMessage)
{
- Record enqueueRecord = mock(Record.class);
+ EnqueueRecord enqueueRecord = mock(EnqueueRecord.class);
when(enqueueRecord.getMessage()).thenReturn(enqueueableMessage);
when(enqueueRecord.getResource()).thenReturn(queue);
return enqueueRecord;
}
+
+ protected Transaction.DequeueRecord createMockDequeueRecord(UUID queueId, long messageNumber)
+ {
+ Transaction.DequeueRecord dequeueRecord = mock(Transaction.DequeueRecord.class);
+ MessageEnqueueRecord enqueueRecord = mock(MessageEnqueueRecord.class);
+ when(enqueueRecord.getMessageNumber()).thenReturn(messageNumber);
+ when(enqueueRecord.getQueueId()).thenReturn(queueId);
+ when(dequeueRecord.getEnqueueRecord()).thenReturn(enqueueRecord);
+ return dequeueRecord;
+ }
+
protected EnqueueableMessage createMockEnqueueableMessage(long messageId,
final StoredMessage<StorableMessageMetaData> storedMessage)
{
@@ -378,8 +426,12 @@ public class SynchronousMessageStoreRecovererTest extends TestCase
private AMQQueue<?> createRegisteredMockQueue()
{
+ return createRegisteredMockQueue(UUID.randomUUID());
+ }
+
+ private AMQQueue<?> createRegisteredMockQueue(UUID queueId)
+ {
AMQQueue<?> queue = mock(AMQQueue.class);
- final UUID queueId = UUID.randomUUID();
when(queue.getMessageDurability()).thenReturn(MessageDurability.DEFAULT);
when(queue.getId()).thenReturn(queueId);
when(queue.getName()).thenReturn("test-queue");
@@ -404,19 +456,47 @@ public class SynchronousMessageStoreRecovererTest extends TestCase
}
}
- private final class MessageNumberMatcher extends ArgumentMatcher<EnqueueableMessage>
+ private final class MessageEnqueueRecordMatcher extends ArgumentMatcher<MessageEnqueueRecord>
{
private final long _messageId;
+ private final UUID _queueId;
- private MessageNumberMatcher(long messageId)
+ private MessageEnqueueRecordMatcher(UUID queueId, long messageId)
{
_messageId = messageId;
+ _queueId = queueId;
}
@Override
public boolean matches(Object argument)
{
- return argument instanceof EnqueueableMessage && ((EnqueueableMessage)argument).getMessageNumber() == _messageId;
+ return argument instanceof MessageEnqueueRecord
+ && ((MessageEnqueueRecord)argument).getMessageNumber() == _messageId
+ && ((MessageEnqueueRecord)argument).getQueueId().equals(_queueId);
+ }
+ }
+
+ private class TestMessageEnqueueRecord implements MessageEnqueueRecord
+ {
+ private final UUID _queueId;
+ private final long _messageId;
+
+ public TestMessageEnqueueRecord(final UUID queueId, final long messageId)
+ {
+ _queueId = queueId;
+ _messageId = messageId;
+ }
+
+ @Override
+ public UUID getQueueId()
+ {
+ return _queueId;
+ }
+
+ @Override
+ public long getMessageNumber()
+ {
+ return _messageId;
}
}
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
index 5affe3019c..db053eef26 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
@@ -377,7 +377,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
private void forceDequeue(final MessageInstance entry, final boolean restoreCredit)
{
AutoCommitTransaction dequeueTxn = new AutoCommitTransaction(_session.getVirtualHost().getMessageStore());
- dequeueTxn.dequeue(entry.getOwningResource(), entry.getMessage(),
+ dequeueTxn.dequeue(entry.getEnqueueRecord(),
new ServerTransaction.Action()
{
public void postCommit()
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
index 69abcd7727..bd04db11ae 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
@@ -76,12 +76,6 @@ public class MessageConverter_Internal_to_v0_10 implements MessageConverter<Inte
}
@Override
- public void addContent(int offsetInMessage, ByteBuffer src)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
public int getContent(int offsetInMessage, ByteBuffer dst)
{
int size = messageContent.length - offsetInMessage;
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
index 209eae9ad1..bd99458d51 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
@@ -83,12 +83,6 @@ public class MessageConverter_v0_10 implements MessageConverter<ServerMessage, M
}
@Override
- public void addContent(int offsetInMessage, ByteBuffer src)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
public int getContent(int offsetInMessage, ByteBuffer dst)
{
return serverMsg.getContent(dst, offsetInMessage);
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index 67204427fb..12f79cf203 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -514,7 +514,7 @@ public class ServerSession extends Session
public void acknowledge(final ConsumerTarget_0_10 sub, final MessageInstance entry)
{
- _transaction.dequeue(entry.getOwningResource(), entry.getMessage(),
+ _transaction.dequeue(entry.getEnqueueRecord(),
new ServerTransaction.Action()
{
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
index 75a67c6c2a..694dda4da2 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
@@ -40,6 +40,7 @@ import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.protocol.ServerProtocolEngine;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.exchange.ExchangeImpl;
+import org.apache.qpid.server.store.MessageHandle;
import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
import org.apache.qpid.server.filter.AMQInvalidArgumentException;
import org.apache.qpid.server.filter.ArrivalTimeFilter;
@@ -493,13 +494,14 @@ public class ServerSessionDelegate extends SessionDelegate
private StoredMessage<MessageMetaData_0_10> createStoreMessage(final MessageTransfer xfr,
final MessageMetaData_0_10 messageMetaData, final MessageStore store)
{
- final StoredMessage<MessageMetaData_0_10> storeMessage = store.addMessage(messageMetaData);
+ final MessageHandle<MessageMetaData_0_10> addedMessage = store.addMessage(messageMetaData);
ByteBuffer body = xfr.getBody();
if(body != null)
{
- storeMessage.addContent(0, body);
+ addedMessage.addContent(body);
}
- return storeMessage;
+ final StoredMessage<MessageMetaData_0_10> storedMessage = addedMessage.allContentAdded();
+ return storedMessage;
}
@Override
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index 16ea23b765..522408910d 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -96,6 +96,7 @@ import org.apache.qpid.server.protocol.ConsumerListener;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueArgumentsConverter;
import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.store.MessageHandle;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TransactionLogResource;
@@ -429,22 +430,24 @@ public class AMQChannel
contentHeader,
getConnection().getLastReceivedTime());
- final StoredMessage<MessageMetaData> handle = _messageStore.addMessage(messageMetaData);
- final AMQMessage amqMessage = createAMQMessage(handle);
- MessageReference reference = amqMessage.newReference();
- try
+ final MessageHandle<MessageMetaData> handle = _messageStore.addMessage(messageMetaData);
+ int bodyCount = _currentMessage.getBodyCount();
+ if(bodyCount > 0)
{
- int bodyCount = _currentMessage.getBodyCount();
- if(bodyCount > 0)
+ long bodyLengthReceived = 0;
+ for(int i = 0 ; i < bodyCount ; i++)
{
- long bodyLengthReceived = 0;
- for(int i = 0 ; i < bodyCount ; i++)
- {
- ContentBody contentChunk = _currentMessage.getContentChunk(i);
- handle.addContent((int)bodyLengthReceived, ByteBuffer.wrap(contentChunk.getPayload()));
- bodyLengthReceived += contentChunk.getSize();
- }
+ ContentBody contentChunk = _currentMessage.getContentChunk(i);
+ handle.addContent(ByteBuffer.wrap(contentChunk.getPayload()));
+ bodyLengthReceived += contentChunk.getSize();
}
+ }
+ final StoredMessage<MessageMetaData> storedMessage = handle.allContentAdded();
+
+ final AMQMessage amqMessage = createAMQMessage(storedMessage);
+ MessageReference reference = amqMessage.newReference();
+ try
+ {
_currentMessage = null;
@@ -500,7 +503,7 @@ public class AMQChannel
.createBasicAckBody(_confirmedMessageCounter, false);
_connection.writeFrame(responseBody.generateFrame(_channelId));
}
- incrementUncommittedMessageSize(handle);
+ incrementUncommittedMessageSize(storedMessage);
incrementOutstandingTxnsIfNecessary();
}
}
@@ -1512,7 +1515,7 @@ public class AMQChannel
try
{
entry.delete();
- txn.dequeue(queue, message,
+ txn.dequeue(entry.getEnqueueRecord(),
new ServerTransaction.Action()
{
@Override
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
index 5f7d5fe46e..d46ed719b2 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
@@ -189,7 +189,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
// The send may of course still fail, in which case, as
// the message is unacked, it will be lost.
- _txn.dequeue(entry.getOwningResource(), entry.getMessage(), NOOP);
+ _txn.dequeue(entry.getEnqueueRecord(), NOOP);
ServerMessage message = entry.getMessage();
MessageReference ref = message.newReference();
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
index 55746dfa95..ef42a1311f 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
@@ -88,12 +88,6 @@ public class MessageConverter_Internal_to_v0_8 implements MessageConverter<Inter
}
@Override
- public void addContent(int offsetInMessage, ByteBuffer src)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
public int getContent(int offsetInMessage, ByteBuffer dst)
{
int size = messageContent.length - offsetInMessage;
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
index 55fc865850..406566cd4c 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
@@ -33,6 +33,7 @@ import org.apache.qpid.framing.MessagePublishInfo;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TestMemoryMessageStore;
import org.apache.qpid.server.txn.AutoCommitTransaction;
@@ -105,22 +106,21 @@ public class AckTest extends QpidTestCase
final MessageMetaData mmd = new MessageMetaData(publishBody,cb, System.currentTimeMillis());
- final StoredMessage<MessageMetaData> result =_messageStore.addMessage(mmd);
+ final StoredMessage<MessageMetaData> result =_messageStore.addMessage(mmd).allContentAdded();
final StoredMessage storedMessage = result;
final AMQMessage message = new AMQMessage(storedMessage);
ServerTransaction txn = new AutoCommitTransaction(_messageStore);
txn.enqueue(_queue, message,
- new ServerTransaction.Action()
+ new ServerTransaction.EnqueueAction()
{
- public void postCommit()
+ public void postCommit(MessageEnqueueRecord... records)
{
- _queue.enqueue(message,null);
+ _queue.enqueue(message,null, null);
}
public void onRollback()
{
- //To change body of implemented methods use File | Settings | File Templates.
}
});
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java
index dcdac07eae..7dd4734e6b 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java
@@ -178,7 +178,7 @@ public class AcknowledgeTest extends QpidTestCase
private void checkStoreContents(int messageCount)
{
MessageCounter counter = new MessageCounter();
- _messageStore.visitMessages(counter);
+ _messageStore.newMessageStoreReader().visitMessages(counter);
assertEquals("Message header count incorrect in the MetaDataMap", messageCount, counter.getCount());
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java
index f21bf07fae..c6aea39aa6 100755
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java
@@ -26,9 +26,10 @@ import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.MessagePublishInfo;
+import org.apache.qpid.server.store.MessageHandle;
import org.apache.qpid.server.store.StoredMessage;
-public class MockStoredMessage implements StoredMessage<MessageMetaData>
+public class MockStoredMessage implements StoredMessage<MessageMetaData>, MessageHandle<MessageMetaData>
{
private long _messageId;
private MessageMetaData _metaData;
@@ -72,12 +73,17 @@ public class MockStoredMessage implements StoredMessage<MessageMetaData>
return _messageId;
}
- public void addContent(int offsetInMessage, ByteBuffer src)
+ public void addContent(ByteBuffer src)
{
src = src.duplicate();
- ByteBuffer dst = _content.duplicate();
- dst.position(offsetInMessage);
- dst.put(src);
+ _content.put(src);
+ }
+
+ @Override
+ public StoredMessage<MessageMetaData> allContentAdded()
+ {
+ _content.flip();
+ return this;
}
public int getContent(int offset, ByteBuffer dst)
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java
index 816485c42b..264350ff8d 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java
@@ -135,7 +135,7 @@ public class QueueBrowserUsesNoAckTest extends QpidTestCase
private void checkStoreContents(int messageCount)
{
MessageCounter counter = new MessageCounter();
- _messageStore.visitMessages(counter);
+ _messageStore.newMessageStoreReader().visitMessages(counter);
assertEquals("Message header count incorrect in the MetaDataMap", messageCount, counter.getCount());
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java
index 6059f7f4e5..de3b68a0bc 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java
@@ -60,7 +60,7 @@ public class ReferenceCountingTest extends QpidTestCase
final MessageMetaData mmd = new MessageMetaData(info, chb);
- StoredMessage storedMessage = _store.addMessage(mmd);
+ StoredMessage storedMessage = _store.addMessage(mmd).allContentAdded();
Transaction txn = _store.newTransaction();
txn.enqueueMessage(createTransactionLogResource("dummyQ"), createEnqueueableMessage(storedMessage));
txn.commitTran();
@@ -78,7 +78,7 @@ public class ReferenceCountingTest extends QpidTestCase
private int getStoreMessageCount()
{
MessageCounter counter = new MessageCounter();
- _store.visitMessages(counter);
+ _store.newMessageStoreReader().visitMessages(counter);
return counter.getCount();
}
@@ -99,7 +99,7 @@ public class ReferenceCountingTest extends QpidTestCase
final MessageMetaData mmd = new MessageMetaData(info, chb);
- StoredMessage storedMessage = _store.addMessage(mmd);
+ StoredMessage storedMessage = _store.addMessage(mmd).allContentAdded();
Transaction txn = _store.newTransaction();
txn.enqueueMessage(createTransactionLogResource("dummyQ"), createEnqueueableMessage(storedMessage));
txn.commitTran();
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
index fa2e543f8d..680c05ec0a 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
@@ -390,7 +390,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget
if(outcome instanceof Accepted)
{
_queueEntry.lockAcquisition();
- txn.dequeue(_queueEntry.getOwningResource(), _queueEntry.getMessage(),
+ txn.dequeue(_queueEntry.getEnqueueRecord(),
new ServerTransaction.Action()
{
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
index 3572b98cad..e9e81bb623 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
@@ -235,12 +235,6 @@ public abstract class MessageConverter_to_1_0<M extends ServerMessage> implement
}
@Override
- public void addContent(int offsetInMessage, ByteBuffer src)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
public int getContent(int offsetInMessage, ByteBuffer dst)
{
ByteBuffer buf = allData.duplicate();
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java
index d0843c349d..e8e31dcd15 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java
@@ -24,6 +24,7 @@ import org.apache.qpid.amqp_1_0.type.Outcome;
import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
import org.apache.qpid.server.txn.ServerTransaction;
public class QueueDestination extends MessageSourceDestination implements SendingDestination, ReceivingDestination
@@ -45,16 +46,16 @@ public class QueueDestination extends MessageSourceDestination implements Sendin
public Outcome send(final Message_1_0 message, ServerTransaction txn)
{
- txn.enqueue(getQueue(),message, new ServerTransaction.Action()
+ txn.enqueue(getQueue(),message, new ServerTransaction.EnqueueAction()
{
MessageReference _reference = message.newReference();
- public void postCommit()
+ public void postCommit(MessageEnqueueRecord... records)
{
try
{
- getQueue().enqueue(message, null);
+ getQueue().enqueue(message, null, records[0]);
}
finally
{
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
index 83430e6008..5deec2bb35 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
@@ -42,6 +42,7 @@ import org.apache.qpid.amqp_1_0.type.transport.Detach;
import org.apache.qpid.amqp_1_0.type.transport.ReceiverSettleMode;
import org.apache.qpid.amqp_1_0.type.transport.Transfer;
import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.store.MessageHandle;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
@@ -150,17 +151,17 @@ public class ReceivingLink_1_0 implements ReceivingLinkListener, Link_1_0, Deliv
_sectionDecoder,
immutableSections);
- StoredMessage<MessageMetaData_1_0> storedMessage = _vhost.getMessageStore().addMessage(mmd);
+ MessageHandle<MessageMetaData_1_0> handle = _vhost.getMessageStore().addMessage(mmd);
boolean skipping = true;
int offset = 0;
for(ByteBuffer bareMessageBuf : immutableSections)
{
- storedMessage.addContent(offset, bareMessageBuf.duplicate());
+ handle.addContent(bareMessageBuf.duplicate());
offset += bareMessageBuf.remaining();
}
-
+ final StoredMessage<MessageMetaData_1_0> storedMessage = handle.allContentAdded();
Message_1_0 message = new Message_1_0(storedMessage, fragments, getSession().getConnection().getReference());
MessageReference<Message_1_0> reference = message.newReference();
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
index 0353b9375d..ec57284a78 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
@@ -410,59 +410,40 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
if(Boolean.TRUE.equals(detach.getClosed())
|| !(TerminusDurability.UNSETTLED_STATE.equals(_durability)|| TerminusDurability.CONFIGURATION.equals(_durability)))
{
- while(!_consumer.trySendLock())
- {
- synchronized (endpoint.getLock())
- {
- try
- {
- endpoint.getLock().wait(100);
- }
- catch (InterruptedException e)
- {
- }
- }
- }
- try
- {
- _consumer.close();
+ _consumer.close();
- Modified state = new Modified();
- state.setDeliveryFailed(true);
+ Modified state = new Modified();
+ state.setDeliveryFailed(true);
- for(UnsettledAction action : _unsettledActionMap.values())
- {
+ for(UnsettledAction action : _unsettledActionMap.values())
+ {
- action.process(state,Boolean.TRUE);
- }
- _unsettledActionMap.clear();
+ action.process(state,Boolean.TRUE);
+ }
+ _unsettledActionMap.clear();
- endpoint.close();
+ endpoint.close();
- if(_destination instanceof ExchangeDestination
- && (_durability == TerminusDurability.CONFIGURATION
- || _durability == TerminusDurability.UNSETTLED_STATE))
+ if(_destination instanceof ExchangeDestination
+ && (_durability == TerminusDurability.CONFIGURATION
+ || _durability == TerminusDurability.UNSETTLED_STATE))
+ {
+ try
{
- try
- {
- _vhost.removeQueue((AMQQueue)_queue);
- }
- catch (AccessControlException e)
- {
- //TODO
- _logger.error("Error registering subscription", e);
- }
+ _vhost.removeQueue((AMQQueue)_queue);
}
-
- if(_closeAction != null)
+ catch (AccessControlException e)
{
- _closeAction.run();
+ //TODO
+ _logger.error("Error registering subscription", e);
}
}
- finally
+
+ if(_closeAction != null)
{
- _consumer.releaseSendLock();
+ _closeAction.run();
}
+
}
else if(detach.getError() != null
&& !_linkAttachment.getEndpoint().getSession().isSyntheticError(detach.getError()))
diff --git a/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java b/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
index 0c47ddf137..789789ac33 100644
--- a/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
+++ b/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
@@ -87,12 +87,6 @@ public class MessageConverter_1_0_to_v0_10 implements MessageConverter<Message_1
}
@Override
- public void addContent(int offsetInMessage, ByteBuffer src)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
public int getContent(int offsetInMessage, ByteBuffer dst)
{
int size = messageContent.length - offsetInMessage;
diff --git a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
index 02c3373f85..a9637e9d4e 100644
--- a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
+++ b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
@@ -192,12 +192,6 @@ public class MessageConverter_0_10_to_0_8 implements MessageConverter<MessageTra
}
@Override
- public void addContent(int offsetInMessage, ByteBuffer src)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
public int getContent(int offsetInMessage, ByteBuffer dst)
{
return message.getContent(dst, offsetInMessage);
diff --git a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
index fbc809305e..ee16d96d5b 100644
--- a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
+++ b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
@@ -81,12 +81,6 @@ public class MessageConverter_0_8_to_0_10 implements MessageConverter<AMQMessag
}
@Override
- public void addContent(int offsetInMessage, ByteBuffer src)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
public int getContent(int offsetInMessage, ByteBuffer dst)
{
return message_0_8.getContent(dst, offsetInMessage);
diff --git a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
index 647e6be28a..d4529aedb1 100644
--- a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
+++ b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
@@ -89,12 +89,6 @@ public class MessageConverter_1_0_to_v0_8 implements MessageConverter<Message_1_
}
@Override
- public void addContent(int offsetInMessage, ByteBuffer src)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
public int getContent(int offsetInMessage, ByteBuffer dst)
{
int size = messageContent.length - offsetInMessage;
diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
index 8a3ef65979..8e64757cbb 100644
--- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
+++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
@@ -59,6 +59,7 @@ import org.apache.qpid.server.plugin.SystemNodeCreator;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.MessageConverterRegistry;
import org.apache.qpid.server.store.MessageDurability;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.txn.AutoCommitTransaction;
@@ -1071,6 +1072,12 @@ class ManagementNode implements MessageSource, MessageDestination
}
@Override
+ public MessageEnqueueRecord getEnqueueRecord()
+ {
+ return null;
+ }
+
+ @Override
public boolean isAcquiredBy(final ConsumerImpl consumer)
{
return false;
diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
index 501ce40db7..dd5b7540a7 100644
--- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
+++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
@@ -26,6 +26,7 @@ import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.message.internal.InternalMessage;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
@@ -90,6 +91,12 @@ class ManagementResponse implements MessageInstance
}
@Override
+ public MessageEnqueueRecord getEnqueueRecord()
+ {
+ return null;
+ }
+
+ @Override
public boolean isAcquiredBy(final ConsumerImpl consumer)
{
return consumer == _consumer && !isDeleted();
diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
index e30571615c..4639236d60 100644
--- a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
+++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
@@ -256,6 +256,7 @@ public class HttpManagement extends AbstractPluginAdapter<HttpManagement> implem
ServletContextHandler root = new ServletContextHandler(ServletContextHandler.SESSIONS);
root.setContextPath("/");
+ root.setCompactPath(true);
server.setHandler(root);
server.setSendServerVersion(false);
final ErrorHandler errorHandler = new ErrorHandler()
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java
index ab7d84e7fa..673842e387 100644
--- a/qpid/java/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java
@@ -603,7 +603,7 @@ public class VirtualHostMessageStoreTest extends QpidTestCase
MessageMetaData mmd = new MessageMetaData(messageInfo, headerBody, System.currentTimeMillis());
- final StoredMessage<MessageMetaData> storedMessage = _virtualHost.getMessageStore().addMessage(mmd);
+ final StoredMessage<MessageMetaData> storedMessage = _virtualHost.getMessageStore().addMessage(mmd).allContentAdded();
final AMQMessage currentMessage = new AMQMessage(storedMessage);