summaryrefslogtreecommitdiff
path: root/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn')
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java28
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java24
2 files changed, 32 insertions, 20 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
index 65064b015c..809c234cc6 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
@@ -30,7 +30,7 @@ import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.util.FutureResult;
import org.apache.qpid.server.store.Transaction;
import org.apache.qpid.server.store.TransactionLogResource;
@@ -55,7 +55,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
public static interface FutureRecorder
{
- public void recordFuture(StoreFuture future, Action action);
+ public void recordFuture(FutureResult future, Action action);
}
@@ -83,7 +83,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
*/
public void addPostTransactionAction(final Action immediateAction)
{
- addFuture(StoreFuture.IMMEDIATE_FUTURE, immediateAction);
+ addFuture(FutureResult.IMMEDIATE_FUTURE, immediateAction);
}
@@ -92,7 +92,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
Transaction txn = null;
try
{
- StoreFuture future;
+ FutureResult future;
if(queue.getMessageDurability().persist(message.isPersistent()))
{
if (_logger.isDebugEnabled())
@@ -108,7 +108,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
}
else
{
- future = StoreFuture.IMMEDIATE_FUTURE;
+ future = FutureResult.IMMEDIATE_FUTURE;
}
addFuture(future, postTransactionAction);
postTransactionAction = null;
@@ -120,7 +120,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
}
- private void addFuture(final StoreFuture future, final Action action)
+ private void addFuture(final FutureResult future, final Action action)
{
if(action != null)
{
@@ -135,7 +135,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
}
}
- private void addEnqueueFuture(final StoreFuture future, final Action action, boolean persistent)
+ private void addEnqueueFuture(final FutureResult future, final Action action, boolean persistent)
{
if(action != null)
{
@@ -178,7 +178,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
}
}
- StoreFuture future;
+ FutureResult future;
if(txn != null)
{
future = txn.commitTranAsync();
@@ -186,7 +186,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
}
else
{
- future = StoreFuture.IMMEDIATE_FUTURE;
+ future = FutureResult.IMMEDIATE_FUTURE;
}
addFuture(future, postTransactionAction);
postTransactionAction = null;
@@ -204,7 +204,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
Transaction txn = null;
try
{
- StoreFuture future;
+ FutureResult future;
if(queue.getMessageDurability().persist(message.isPersistent()))
{
if (_logger.isDebugEnabled())
@@ -219,7 +219,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
}
else
{
- future = StoreFuture.IMMEDIATE_FUTURE;
+ future = FutureResult.IMMEDIATE_FUTURE;
}
addEnqueueFuture(future, postTransactionAction, message.isPersistent());
postTransactionAction = null;
@@ -255,7 +255,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
}
}
- StoreFuture future;
+ FutureResult future;
if (txn != null)
{
future = txn.commitTranAsync();
@@ -263,7 +263,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
}
else
{
- future = StoreFuture.IMMEDIATE_FUTURE;
+ future = FutureResult.IMMEDIATE_FUTURE;
}
addEnqueueFuture(future, postTransactionAction, message.isPersistent());
postTransactionAction = null;
@@ -281,7 +281,7 @@ public class AsyncAutoCommitTransaction implements ServerTransaction
{
if(immediatePostTransactionAction != null)
{
- addFuture(StoreFuture.IMMEDIATE_FUTURE, new Action()
+ addFuture(FutureResult.IMMEDIATE_FUTURE, new Action()
{
public void postCommit()
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
index 349ec793fe..b800556312 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.txn;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,7 +33,7 @@ import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.util.FutureResult;
import org.apache.qpid.server.store.Transaction;
import org.apache.qpid.server.store.TransactionLogResource;
@@ -53,7 +54,7 @@ public class LocalTransaction implements ServerTransaction
private final MessageStore _transactionLog;
private volatile long _txnStartTime = 0L;
private volatile long _txnUpdateTime = 0l;
- private StoreFuture _asyncTran;
+ private FutureResult _asyncTran;
public LocalTransaction(MessageStore transactionLog)
{
@@ -271,16 +272,16 @@ public class LocalTransaction implements ServerTransaction
}
}
- public StoreFuture commitAsync(final Runnable deferred)
+ public FutureResult commitAsync(final Runnable deferred)
{
sync();
- StoreFuture future = StoreFuture.IMMEDIATE_FUTURE;
+ FutureResult future = FutureResult.IMMEDIATE_FUTURE;
if(_transaction != null)
{
- future = new StoreFuture()
+ future = new FutureResult()
{
private volatile boolean _completed = false;
- private StoreFuture _underlying = _transaction.commitTranAsync();
+ private FutureResult _underlying = _transaction.commitTranAsync();
@Override
public boolean isComplete()
@@ -298,6 +299,17 @@ public class LocalTransaction implements ServerTransaction
}
}
+ @Override
+ public void waitForCompletion(final long timeout) throws TimeoutException
+ {
+
+ if(!_completed)
+ {
+ _underlying.waitForCompletion(timeout);
+ checkUnderlyingCompletion();
+ }
+ }
+
private synchronized boolean checkUnderlyingCompletion()
{
if(!_completed && _underlying.isComplete())