diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java')
-rwxr-xr-x | qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java | 61 |
1 files changed, 35 insertions, 26 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java index 36e9d78440..a67d4badd1 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.txn; +import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -30,7 +31,7 @@ import org.apache.qpid.server.message.EnqueableMessage; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.store.TransactionLog; +import org.apache.qpid.server.store.MessageStore; /** * An implementation of ServerTransaction where each enqueue/dequeue @@ -43,11 +44,11 @@ public class AutoCommitTransaction implements ServerTransaction { protected static final Logger _logger = Logger.getLogger(AutoCommitTransaction.class); - private final TransactionLog _transactionLog; + private final MessageStore _messageStore; - public AutoCommitTransaction(TransactionLog transactionLog) + public AutoCommitTransaction(MessageStore transactionLog) { - _transactionLog = transactionLog; + _messageStore = transactionLog; } public long getTransactionStartTime() @@ -59,14 +60,14 @@ public class AutoCommitTransaction implements ServerTransaction * Since AutoCommitTransaction have no concept of a long lived transaction, any Actions registered * by the caller are executed immediately. */ - public void addPostTransactionAction(Action immediateAction) + public void addPostTransactionAction(final Action immediateAction) { immediateAction.postCommit(); } public void dequeue(BaseQueue queue, EnqueableMessage message, Action postTransactionAction) { - TransactionLog.Transaction txn = null; + MessageStore.Transaction txn = null; try { if(message.isPersistent() && queue.isDurable()) @@ -76,8 +77,8 @@ public class AutoCommitTransaction implements ServerTransaction _logger.debug("Dequeue of message number " + message.getMessageNumber() + " from transaction log. Queue : " + queue.getNameShortString()); } - txn = _transactionLog.newTransaction(); - txn.dequeueMessage(queue, message.getMessageNumber()); + txn = _messageStore.newTransaction(); + txn.dequeueMessage(queue, message); txn.commitTran(); txn = null; } @@ -98,7 +99,7 @@ public class AutoCommitTransaction implements ServerTransaction public void dequeue(Collection<QueueEntry> queueEntries, Action postTransactionAction) { - TransactionLog.Transaction txn = null; + MessageStore.Transaction txn = null; try { for(QueueEntry entry : queueEntries) @@ -115,10 +116,10 @@ public class AutoCommitTransaction implements ServerTransaction if(txn == null) { - txn = _transactionLog.newTransaction(); + txn = _messageStore.newTransaction(); } - txn.dequeueMessage(queue, message.getMessageNumber()); + txn.dequeueMessage(queue, message); } } @@ -145,7 +146,7 @@ public class AutoCommitTransaction implements ServerTransaction public void enqueue(BaseQueue queue, EnqueableMessage message, Action postTransactionAction) { - TransactionLog.Transaction txn = null; + MessageStore.Transaction txn = null; try { if(message.isPersistent() && queue.isDurable()) @@ -155,8 +156,8 @@ public class AutoCommitTransaction implements ServerTransaction _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getNameShortString()); } - txn = _transactionLog.newTransaction(); - txn.enqueueMessage(queue, message.getMessageNumber()); + txn = _messageStore.newTransaction(); + txn.enqueueMessage(queue, message); txn.commitTran(); txn = null; } @@ -176,15 +177,14 @@ public class AutoCommitTransaction implements ServerTransaction } - public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction) + public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction, long currentTime) { - TransactionLog.Transaction txn = null; + MessageStore.Transaction txn = null; try { if(message.isPersistent()) { - Long id = message.getMessageNumber(); for(BaseQueue queue : queues) { if (queue.isDurable()) @@ -195,22 +195,26 @@ public class AutoCommitTransaction implements ServerTransaction } if (txn == null) { - txn = _transactionLog.newTransaction(); + txn = _messageStore.newTransaction(); } - txn.enqueueMessage(queue, id); + txn.enqueueMessage(queue, message); + + } } - if (txn != null) - { - txn.commitTran(); - txn = null; - - } } + if (txn != null) + { + txn.commitTran(); + txn = null; + } + postTransactionAction.postCommit(); postTransactionAction = null; + + } catch (AMQException e) { @@ -225,6 +229,11 @@ public class AutoCommitTransaction implements ServerTransaction } + public void commit(final Runnable immediatePostTransactionAction) + { + immediatePostTransactionAction.run(); + } + public void commit() { } @@ -233,7 +242,7 @@ public class AutoCommitTransaction implements ServerTransaction { } - private void rollbackIfNecessary(Action postTransactionAction, TransactionLog.Transaction txn) + private void rollbackIfNecessary(Action postTransactionAction, MessageStore.Transaction txn) { if (txn != null) { |