summaryrefslogtreecommitdiff
path: root/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java')
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java66
1 files changed, 51 insertions, 15 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java
index 1f05dca41a..133a0ee7d9 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java
@@ -22,16 +22,17 @@ package org.apache.qpid.server.store.berkeleydb;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.log4j.Logger;
-import org.apache.qpid.server.store.StoreException;
-import org.apache.qpid.server.store.StoreFuture;
-
import com.sleepycat.je.CheckpointConfig;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Environment;
import com.sleepycat.je.Transaction;
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.server.store.StoreException;
+import org.apache.qpid.server.util.FutureResult;
public class CommitThreadWrapper
{
@@ -53,16 +54,16 @@ public class CommitThreadWrapper
_commitThread.join();
}
- public StoreFuture commit(Transaction tx, boolean syncCommit)
+ public FutureResult commit(Transaction tx, boolean syncCommit)
{
- BDBCommitFuture commitFuture = new BDBCommitFuture(_commitThread, tx, syncCommit);
+ BDBCommitFutureResult commitFuture = new BDBCommitFutureResult(_commitThread, tx, syncCommit);
commitFuture.commit();
return commitFuture;
}
- private static final class BDBCommitFuture implements StoreFuture
+ private static final class BDBCommitFutureResult implements FutureResult
{
- private static final Logger LOGGER = Logger.getLogger(BDBCommitFuture.class);
+ private static final Logger LOGGER = Logger.getLogger(BDBCommitFutureResult.class);
private final CommitThread _commitThread;
private final Transaction _tx;
@@ -70,7 +71,7 @@ public class CommitThreadWrapper
private boolean _complete;
private boolean _syncCommit;
- public BDBCommitFuture(CommitThread commitThread, Transaction tx, boolean syncCommit)
+ public BDBCommitFutureResult(CommitThread commitThread, Transaction tx, boolean syncCommit)
{
_commitThread = commitThread;
_tx = tx;
@@ -150,13 +151,48 @@ public class CommitThreadWrapper
LOGGER.debug("waitForCompletion returning after " + duration + " ms for transaction " + _tx);
}
}
+
+ @Override
+ public void waitForCompletion(final long timeout) throws TimeoutException
+ {
+ long startTime = System.currentTimeMillis();
+ long remaining = timeout;
+
+ while (!isComplete() && remaining > 0)
+ {
+ _commitThread.explicitNotify();
+ try
+ {
+ wait(remaining);
+ }
+ catch (InterruptedException e)
+ {
+ throw new StoreException(e);
+ }
+ if(!isComplete())
+ {
+ remaining = (startTime + timeout) - System.currentTimeMillis();
+ }
+ }
+
+ if(remaining < 0)
+ {
+ throw new TimeoutException("Commit did not complete within required timeout: " + timeout);
+ }
+
+ if(LOGGER.isDebugEnabled())
+ {
+ long duration = System.currentTimeMillis() - startTime;
+ LOGGER.debug("waitForCompletion returning after " + duration + " ms for transaction " + _tx);
+ }
+ }
}
/**
- * Implements a thread which batches and commits a queue of {@link BDBCommitFuture} operations. The commit operations
+ * Implements a thread which batches and commits a queue of {@link org.apache.qpid.server.store.berkeleydb.CommitThreadWrapper.BDBCommitFutureResult} operations. The commit operations
* themselves are responsible for adding themselves to the queue and waiting for the commit to happen before
* continuing, but it is the responsibility of this thread to tell the commit operations when they have been
- * completed by calling back on their {@link BDBCommitFuture#complete()} and {@link BDBCommitFuture#abort} methods.
+ * completed by calling back on their {@link org.apache.qpid.server.store.berkeleydb.CommitThreadWrapper.BDBCommitFutureResult#complete()} and {@link org.apache.qpid.server.store.berkeleydb.CommitThreadWrapper.BDBCommitFutureResult#abort} methods.
*
* <p/><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations </table>
*/
@@ -165,7 +201,7 @@ public class CommitThreadWrapper
private static final Logger LOGGER = Logger.getLogger(CommitThread.class);
private final AtomicBoolean _stopped = new AtomicBoolean(false);
- private final Queue<BDBCommitFuture> _jobQueue = new ConcurrentLinkedQueue<BDBCommitFuture>();
+ private final Queue<BDBCommitFutureResult> _jobQueue = new ConcurrentLinkedQueue<BDBCommitFutureResult>();
private final CheckpointConfig _config = new CheckpointConfig();
private final Object _lock = new Object();
private Environment _environment;
@@ -230,7 +266,7 @@ public class CommitThreadWrapper
for(int i = 0; i < size; i++)
{
- BDBCommitFuture commit = _jobQueue.poll();
+ BDBCommitFutureResult commit = _jobQueue.poll();
commit.complete();
}
@@ -243,7 +279,7 @@ public class CommitThreadWrapper
for(int i = 0; i < size; i++)
{
- BDBCommitFuture commit = _jobQueue.poll();
+ BDBCommitFutureResult commit = _jobQueue.poll();
commit.abort(e);
}
}
@@ -268,7 +304,7 @@ public class CommitThreadWrapper
return !_jobQueue.isEmpty();
}
- public void addJob(BDBCommitFuture commit, final boolean sync)
+ public void addJob(BDBCommitFutureResult commit, final boolean sync)
{
_jobQueue.add(commit);