summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java')
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java61
1 files changed, 35 insertions, 26 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
index 36e9d78440..a67d4badd1 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.txn;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@@ -30,7 +31,7 @@ import org.apache.qpid.server.message.EnqueableMessage;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.store.TransactionLog;
+import org.apache.qpid.server.store.MessageStore;
/**
* An implementation of ServerTransaction where each enqueue/dequeue
@@ -43,11 +44,11 @@ public class AutoCommitTransaction implements ServerTransaction
{
protected static final Logger _logger = Logger.getLogger(AutoCommitTransaction.class);
- private final TransactionLog _transactionLog;
+ private final MessageStore _messageStore;
- public AutoCommitTransaction(TransactionLog transactionLog)
+ public AutoCommitTransaction(MessageStore transactionLog)
{
- _transactionLog = transactionLog;
+ _messageStore = transactionLog;
}
public long getTransactionStartTime()
@@ -59,14 +60,14 @@ public class AutoCommitTransaction implements ServerTransaction
* Since AutoCommitTransaction have no concept of a long lived transaction, any Actions registered
* by the caller are executed immediately.
*/
- public void addPostTransactionAction(Action immediateAction)
+ public void addPostTransactionAction(final Action immediateAction)
{
immediateAction.postCommit();
}
public void dequeue(BaseQueue queue, EnqueableMessage message, Action postTransactionAction)
{
- TransactionLog.Transaction txn = null;
+ MessageStore.Transaction txn = null;
try
{
if(message.isPersistent() && queue.isDurable())
@@ -76,8 +77,8 @@ public class AutoCommitTransaction implements ServerTransaction
_logger.debug("Dequeue of message number " + message.getMessageNumber() + " from transaction log. Queue : " + queue.getNameShortString());
}
- txn = _transactionLog.newTransaction();
- txn.dequeueMessage(queue, message.getMessageNumber());
+ txn = _messageStore.newTransaction();
+ txn.dequeueMessage(queue, message);
txn.commitTran();
txn = null;
}
@@ -98,7 +99,7 @@ public class AutoCommitTransaction implements ServerTransaction
public void dequeue(Collection<QueueEntry> queueEntries, Action postTransactionAction)
{
- TransactionLog.Transaction txn = null;
+ MessageStore.Transaction txn = null;
try
{
for(QueueEntry entry : queueEntries)
@@ -115,10 +116,10 @@ public class AutoCommitTransaction implements ServerTransaction
if(txn == null)
{
- txn = _transactionLog.newTransaction();
+ txn = _messageStore.newTransaction();
}
- txn.dequeueMessage(queue, message.getMessageNumber());
+ txn.dequeueMessage(queue, message);
}
}
@@ -145,7 +146,7 @@ public class AutoCommitTransaction implements ServerTransaction
public void enqueue(BaseQueue queue, EnqueableMessage message, Action postTransactionAction)
{
- TransactionLog.Transaction txn = null;
+ MessageStore.Transaction txn = null;
try
{
if(message.isPersistent() && queue.isDurable())
@@ -155,8 +156,8 @@ public class AutoCommitTransaction implements ServerTransaction
_logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getNameShortString());
}
- txn = _transactionLog.newTransaction();
- txn.enqueueMessage(queue, message.getMessageNumber());
+ txn = _messageStore.newTransaction();
+ txn.enqueueMessage(queue, message);
txn.commitTran();
txn = null;
}
@@ -176,15 +177,14 @@ public class AutoCommitTransaction implements ServerTransaction
}
- public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction)
+ public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction, long currentTime)
{
- TransactionLog.Transaction txn = null;
+ MessageStore.Transaction txn = null;
try
{
if(message.isPersistent())
{
- Long id = message.getMessageNumber();
for(BaseQueue queue : queues)
{
if (queue.isDurable())
@@ -195,22 +195,26 @@ public class AutoCommitTransaction implements ServerTransaction
}
if (txn == null)
{
- txn = _transactionLog.newTransaction();
+ txn = _messageStore.newTransaction();
}
- txn.enqueueMessage(queue, id);
+ txn.enqueueMessage(queue, message);
+
+
}
}
- if (txn != null)
- {
- txn.commitTran();
- txn = null;
-
- }
}
+ if (txn != null)
+ {
+ txn.commitTran();
+ txn = null;
+ }
+
postTransactionAction.postCommit();
postTransactionAction = null;
+
+
}
catch (AMQException e)
{
@@ -225,6 +229,11 @@ public class AutoCommitTransaction implements ServerTransaction
}
+ public void commit(final Runnable immediatePostTransactionAction)
+ {
+ immediatePostTransactionAction.run();
+ }
+
public void commit()
{
}
@@ -233,7 +242,7 @@ public class AutoCommitTransaction implements ServerTransaction
{
}
- private void rollbackIfNecessary(Action postTransactionAction, TransactionLog.Transaction txn)
+ private void rollbackIfNecessary(Action postTransactionAction, MessageStore.Transaction txn)
{
if (txn != null)
{