summaryrefslogtreecommitdiff
path: root/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java')
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java24
1 files changed, 18 insertions, 6 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
index 349ec793fe..b800556312 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.txn;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,7 +33,7 @@ import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.util.FutureResult;
import org.apache.qpid.server.store.Transaction;
import org.apache.qpid.server.store.TransactionLogResource;
@@ -53,7 +54,7 @@ public class LocalTransaction implements ServerTransaction
private final MessageStore _transactionLog;
private volatile long _txnStartTime = 0L;
private volatile long _txnUpdateTime = 0l;
- private StoreFuture _asyncTran;
+ private FutureResult _asyncTran;
public LocalTransaction(MessageStore transactionLog)
{
@@ -271,16 +272,16 @@ public class LocalTransaction implements ServerTransaction
}
}
- public StoreFuture commitAsync(final Runnable deferred)
+ public FutureResult commitAsync(final Runnable deferred)
{
sync();
- StoreFuture future = StoreFuture.IMMEDIATE_FUTURE;
+ FutureResult future = FutureResult.IMMEDIATE_FUTURE;
if(_transaction != null)
{
- future = new StoreFuture()
+ future = new FutureResult()
{
private volatile boolean _completed = false;
- private StoreFuture _underlying = _transaction.commitTranAsync();
+ private FutureResult _underlying = _transaction.commitTranAsync();
@Override
public boolean isComplete()
@@ -298,6 +299,17 @@ public class LocalTransaction implements ServerTransaction
}
}
+ @Override
+ public void waitForCompletion(final long timeout) throws TimeoutException
+ {
+
+ if(!_completed)
+ {
+ _underlying.waitForCompletion(timeout);
+ checkUnderlyingCompletion();
+ }
+ }
+
private synchronized boolean checkUnderlyingCompletion()
{
if(!_completed && _underlying.isComplete())