diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java')
-rwxr-xr-x | java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java | 157 |
1 files changed, 139 insertions, 18 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java index 11401ebd65..3fbcff7e2c 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java @@ -1,4 +1,3 @@ -package org.apache.qpid.server.txn; /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -19,7 +18,9 @@ package org.apache.qpid.server.txn; * under the License. * */ +package org.apache.qpid.server.txn; +import org.apache.qpid.server.store.StoreFuture; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,6 +51,7 @@ public class LocalTransaction implements ServerTransaction private volatile Transaction _transaction; private MessageStore _transactionLog; private long _txnStartTime = 0L; + private StoreFuture _asyncTran; public LocalTransaction(MessageStore transactionLog) { @@ -68,11 +70,13 @@ public class LocalTransaction implements ServerTransaction public void addPostTransactionAction(Action postTransactionAction) { + sync(); _postTransactionActions.add(postTransactionAction); } public void dequeue(BaseQueue queue, EnqueableMessage message, Action postTransactionAction) { + sync(); _postTransactionActions.add(postTransactionAction); if(message.isPersistent() && queue.isDurable()) @@ -98,6 +102,7 @@ public class LocalTransaction implements ServerTransaction public void dequeue(Collection<QueueEntry> queueEntries, Action postTransactionAction) { + sync(); _postTransactionActions.add(postTransactionAction); try @@ -131,10 +136,7 @@ public class LocalTransaction implements ServerTransaction { try { - for(Action action : _postTransactionActions) - { - action.onRollback(); - } + doRollbackActions(); } finally { @@ -151,7 +153,7 @@ public class LocalTransaction implements ServerTransaction } finally { - resetDetails(); + resetDetails(); } } @@ -176,6 +178,7 @@ public class LocalTransaction implements ServerTransaction public void enqueue(BaseQueue queue, EnqueableMessage message, Action postTransactionAction) { + sync(); _postTransactionActions.add(postTransactionAction); if(message.isPersistent() && queue.isDurable()) @@ -201,6 +204,7 @@ public class LocalTransaction implements ServerTransaction public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction, long currentTime) { + sync(); _postTransactionActions.add(postTransactionAction); if (_txnStartTime == 0L) @@ -239,11 +243,13 @@ public class LocalTransaction implements ServerTransaction public void commit() { + sync(); commit(null); } public void commit(Runnable immediateAction) { + sync(); try { if(_transaction != null) @@ -256,29 +262,137 @@ public class LocalTransaction implements ServerTransaction immediateAction.run(); } - for(int i = 0; i < _postTransactionActions.size(); i++) + doPostTransactionActions(); + } + catch (Exception e) + { + _logger.error("Failed to commit transaction", e); + + doRollbackActions(); + throw new RuntimeException("Failed to commit transaction", e); + } + finally + { + resetDetails(); + } + } + + private void doRollbackActions() + { + for(Action action : _postTransactionActions) + { + action.onRollback(); + } + } + + public StoreFuture commitAsync(final Runnable deferred) + { + sync(); + try + { + StoreFuture future = StoreFuture.IMMEDIATE_FUTURE; + if(_transaction != null) { - _postTransactionActions.get(i).postCommit(); + future = new StoreFuture() + { + private volatile boolean _completed = false; + private StoreFuture _underlying = _transaction.commitTranAsync(); + + @Override + public boolean isComplete() + { + return _completed || checkUnderlyingCompletion(); + } + + @Override + public void waitForCompletion() + { + if(!_completed) + { + _underlying.waitForCompletion(); + checkUnderlyingCompletion(); + } + } + + private synchronized boolean checkUnderlyingCompletion() + { + if(!_completed && _underlying.isComplete()) + { + completeDeferredWork(); + _completed = true; + } + return _completed; + + } + + private void completeDeferredWork() + { + try + { + doPostTransactionActions(); + deferred.run(); + + } + catch (Exception e) + { + _logger.error("Failed to commit transaction", e); + + doRollbackActions(); + throw new RuntimeException("Failed to commit transaction", e); + } + finally + { + resetDetails(); + } + } + + }; + _asyncTran = future; } + else + { + try + { + doPostTransactionActions(); + + deferred.run(); + } + finally + { + resetDetails(); + } + } + + return future; } catch (Exception e) { _logger.error("Failed to commit transaction", e); - - for(Action action : _postTransactionActions) + try { - action.onRollback(); + doRollbackActions(); + } + finally + { + resetDetails(); } throw new RuntimeException("Failed to commit transaction", e); } - finally + + + } + + private void doPostTransactionActions() + { + for(int i = 0; i < _postTransactionActions.size(); i++) { - resetDetails(); + _postTransactionActions.get(i).postCommit(); } } public void rollback() { + sync(); try { if(_transaction != null) @@ -295,10 +409,7 @@ public class LocalTransaction implements ServerTransaction { try { - for(Action action : _postTransactionActions) - { - action.onRollback(); - } + doRollbackActions(); } finally { @@ -306,9 +417,19 @@ public class LocalTransaction implements ServerTransaction } } } - + + public void sync() + { + if(_asyncTran != null) + { + _asyncTran.waitForCompletion(); + _asyncTran = null; + } + } + private void resetDetails() { + _asyncTran = null; _transaction = null; _postTransactionActions.clear(); _txnStartTime = 0L; |