diff options
Diffstat (limited to 'qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn')
2 files changed, 32 insertions, 20 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java index 65064b015c..809c234cc6 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java @@ -30,7 +30,7 @@ import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.util.FutureResult; import org.apache.qpid.server.store.Transaction; import org.apache.qpid.server.store.TransactionLogResource; @@ -55,7 +55,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction public static interface FutureRecorder { - public void recordFuture(StoreFuture future, Action action); + public void recordFuture(FutureResult future, Action action); } @@ -83,7 +83,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction */ public void addPostTransactionAction(final Action immediateAction) { - addFuture(StoreFuture.IMMEDIATE_FUTURE, immediateAction); + addFuture(FutureResult.IMMEDIATE_FUTURE, immediateAction); } @@ -92,7 +92,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction Transaction txn = null; try { - StoreFuture future; + FutureResult future; if(queue.getMessageDurability().persist(message.isPersistent())) { if (_logger.isDebugEnabled()) @@ -108,7 +108,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction } else { - future = StoreFuture.IMMEDIATE_FUTURE; + future = FutureResult.IMMEDIATE_FUTURE; } addFuture(future, postTransactionAction); postTransactionAction = null; @@ -120,7 +120,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction } - private void addFuture(final StoreFuture future, final Action action) + private void addFuture(final FutureResult future, final Action action) { if(action != null) { @@ -135,7 +135,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction } } - private void addEnqueueFuture(final StoreFuture future, final Action action, boolean persistent) + private void addEnqueueFuture(final FutureResult future, final Action action, boolean persistent) { if(action != null) { @@ -178,7 +178,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction } } - StoreFuture future; + FutureResult future; if(txn != null) { future = txn.commitTranAsync(); @@ -186,7 +186,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction } else { - future = StoreFuture.IMMEDIATE_FUTURE; + future = FutureResult.IMMEDIATE_FUTURE; } addFuture(future, postTransactionAction); postTransactionAction = null; @@ -204,7 +204,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction Transaction txn = null; try { - StoreFuture future; + FutureResult future; if(queue.getMessageDurability().persist(message.isPersistent())) { if (_logger.isDebugEnabled()) @@ -219,7 +219,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction } else { - future = StoreFuture.IMMEDIATE_FUTURE; + future = FutureResult.IMMEDIATE_FUTURE; } addEnqueueFuture(future, postTransactionAction, message.isPersistent()); postTransactionAction = null; @@ -255,7 +255,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction } } - StoreFuture future; + FutureResult future; if (txn != null) { future = txn.commitTranAsync(); @@ -263,7 +263,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction } else { - future = StoreFuture.IMMEDIATE_FUTURE; + future = FutureResult.IMMEDIATE_FUTURE; } addEnqueueFuture(future, postTransactionAction, message.isPersistent()); postTransactionAction = null; @@ -281,7 +281,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction { if(immediatePostTransactionAction != null) { - addFuture(StoreFuture.IMMEDIATE_FUTURE, new Action() + addFuture(FutureResult.IMMEDIATE_FUTURE, new Action() { public void postCommit() { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java index 349ec793fe..b800556312 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.txn; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.concurrent.TimeoutException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,7 +33,7 @@ import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.util.FutureResult; import org.apache.qpid.server.store.Transaction; import org.apache.qpid.server.store.TransactionLogResource; @@ -53,7 +54,7 @@ public class LocalTransaction implements ServerTransaction private final MessageStore _transactionLog; private volatile long _txnStartTime = 0L; private volatile long _txnUpdateTime = 0l; - private StoreFuture _asyncTran; + private FutureResult _asyncTran; public LocalTransaction(MessageStore transactionLog) { @@ -271,16 +272,16 @@ public class LocalTransaction implements ServerTransaction } } - public StoreFuture commitAsync(final Runnable deferred) + public FutureResult commitAsync(final Runnable deferred) { sync(); - StoreFuture future = StoreFuture.IMMEDIATE_FUTURE; + FutureResult future = FutureResult.IMMEDIATE_FUTURE; if(_transaction != null) { - future = new StoreFuture() + future = new FutureResult() { private volatile boolean _completed = false; - private StoreFuture _underlying = _transaction.commitTranAsync(); + private FutureResult _underlying = _transaction.commitTranAsync(); @Override public boolean isComplete() @@ -298,6 +299,17 @@ public class LocalTransaction implements ServerTransaction } } + @Override + public void waitForCompletion(final long timeout) throws TimeoutException + { + + if(!_completed) + { + _underlying.waitForCompletion(timeout); + checkUnderlyingCompletion(); + } + } + private synchronized boolean checkUnderlyingCompletion() { if(!_completed && _underlying.isComplete()) |