summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java')
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java157
1 files changed, 139 insertions, 18 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
index 11401ebd65..3fbcff7e2c 100755
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
@@ -1,4 +1,3 @@
-package org.apache.qpid.server.txn;
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -19,7 +18,9 @@ package org.apache.qpid.server.txn;
* under the License.
*
*/
+package org.apache.qpid.server.txn;
+import org.apache.qpid.server.store.StoreFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,6 +51,7 @@ public class LocalTransaction implements ServerTransaction
private volatile Transaction _transaction;
private MessageStore _transactionLog;
private long _txnStartTime = 0L;
+ private StoreFuture _asyncTran;
public LocalTransaction(MessageStore transactionLog)
{
@@ -68,11 +70,13 @@ public class LocalTransaction implements ServerTransaction
public void addPostTransactionAction(Action postTransactionAction)
{
+ sync();
_postTransactionActions.add(postTransactionAction);
}
public void dequeue(BaseQueue queue, EnqueableMessage message, Action postTransactionAction)
{
+ sync();
_postTransactionActions.add(postTransactionAction);
if(message.isPersistent() && queue.isDurable())
@@ -98,6 +102,7 @@ public class LocalTransaction implements ServerTransaction
public void dequeue(Collection<QueueEntry> queueEntries, Action postTransactionAction)
{
+ sync();
_postTransactionActions.add(postTransactionAction);
try
@@ -131,10 +136,7 @@ public class LocalTransaction implements ServerTransaction
{
try
{
- for(Action action : _postTransactionActions)
- {
- action.onRollback();
- }
+ doRollbackActions();
}
finally
{
@@ -151,7 +153,7 @@ public class LocalTransaction implements ServerTransaction
}
finally
{
- resetDetails();
+ resetDetails();
}
}
@@ -176,6 +178,7 @@ public class LocalTransaction implements ServerTransaction
public void enqueue(BaseQueue queue, EnqueableMessage message, Action postTransactionAction)
{
+ sync();
_postTransactionActions.add(postTransactionAction);
if(message.isPersistent() && queue.isDurable())
@@ -201,6 +204,7 @@ public class LocalTransaction implements ServerTransaction
public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction, long currentTime)
{
+ sync();
_postTransactionActions.add(postTransactionAction);
if (_txnStartTime == 0L)
@@ -239,11 +243,13 @@ public class LocalTransaction implements ServerTransaction
public void commit()
{
+ sync();
commit(null);
}
public void commit(Runnable immediateAction)
{
+ sync();
try
{
if(_transaction != null)
@@ -256,29 +262,137 @@ public class LocalTransaction implements ServerTransaction
immediateAction.run();
}
- for(int i = 0; i < _postTransactionActions.size(); i++)
+ doPostTransactionActions();
+ }
+ catch (Exception e)
+ {
+ _logger.error("Failed to commit transaction", e);
+
+ doRollbackActions();
+ throw new RuntimeException("Failed to commit transaction", e);
+ }
+ finally
+ {
+ resetDetails();
+ }
+ }
+
+ private void doRollbackActions()
+ {
+ for(Action action : _postTransactionActions)
+ {
+ action.onRollback();
+ }
+ }
+
+ public StoreFuture commitAsync(final Runnable deferred)
+ {
+ sync();
+ try
+ {
+ StoreFuture future = StoreFuture.IMMEDIATE_FUTURE;
+ if(_transaction != null)
{
- _postTransactionActions.get(i).postCommit();
+ future = new StoreFuture()
+ {
+ private volatile boolean _completed = false;
+ private StoreFuture _underlying = _transaction.commitTranAsync();
+
+ @Override
+ public boolean isComplete()
+ {
+ return _completed || checkUnderlyingCompletion();
+ }
+
+ @Override
+ public void waitForCompletion()
+ {
+ if(!_completed)
+ {
+ _underlying.waitForCompletion();
+ checkUnderlyingCompletion();
+ }
+ }
+
+ private synchronized boolean checkUnderlyingCompletion()
+ {
+ if(!_completed && _underlying.isComplete())
+ {
+ completeDeferredWork();
+ _completed = true;
+ }
+ return _completed;
+
+ }
+
+ private void completeDeferredWork()
+ {
+ try
+ {
+ doPostTransactionActions();
+ deferred.run();
+
+ }
+ catch (Exception e)
+ {
+ _logger.error("Failed to commit transaction", e);
+
+ doRollbackActions();
+ throw new RuntimeException("Failed to commit transaction", e);
+ }
+ finally
+ {
+ resetDetails();
+ }
+ }
+
+ };
+ _asyncTran = future;
}
+ else
+ {
+ try
+ {
+ doPostTransactionActions();
+
+ deferred.run();
+ }
+ finally
+ {
+ resetDetails();
+ }
+ }
+
+ return future;
}
catch (Exception e)
{
_logger.error("Failed to commit transaction", e);
-
- for(Action action : _postTransactionActions)
+ try
{
- action.onRollback();
+ doRollbackActions();
+ }
+ finally
+ {
+ resetDetails();
}
throw new RuntimeException("Failed to commit transaction", e);
}
- finally
+
+
+ }
+
+ private void doPostTransactionActions()
+ {
+ for(int i = 0; i < _postTransactionActions.size(); i++)
{
- resetDetails();
+ _postTransactionActions.get(i).postCommit();
}
}
public void rollback()
{
+ sync();
try
{
if(_transaction != null)
@@ -295,10 +409,7 @@ public class LocalTransaction implements ServerTransaction
{
try
{
- for(Action action : _postTransactionActions)
- {
- action.onRollback();
- }
+ doRollbackActions();
}
finally
{
@@ -306,9 +417,19 @@ public class LocalTransaction implements ServerTransaction
}
}
}
-
+
+ public void sync()
+ {
+ if(_asyncTran != null)
+ {
+ _asyncTran.waitForCompletion();
+ _asyncTran = null;
+ }
+ }
+
private void resetDetails()
{
+ _asyncTran = null;
_transaction = null;
_postTransactionActions.clear();
_txnStartTime = 0L;