diff options
Diffstat (limited to 'qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java')
-rw-r--r-- | qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java | 66 |
1 files changed, 51 insertions, 15 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java index 1f05dca41a..133a0ee7d9 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java @@ -22,16 +22,17 @@ package org.apache.qpid.server.store.berkeleydb; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.log4j.Logger; -import org.apache.qpid.server.store.StoreException; -import org.apache.qpid.server.store.StoreFuture; - import com.sleepycat.je.CheckpointConfig; import com.sleepycat.je.DatabaseException; import com.sleepycat.je.Environment; import com.sleepycat.je.Transaction; +import org.apache.log4j.Logger; + +import org.apache.qpid.server.store.StoreException; +import org.apache.qpid.server.util.FutureResult; public class CommitThreadWrapper { @@ -53,16 +54,16 @@ public class CommitThreadWrapper _commitThread.join(); } - public StoreFuture commit(Transaction tx, boolean syncCommit) + public FutureResult commit(Transaction tx, boolean syncCommit) { - BDBCommitFuture commitFuture = new BDBCommitFuture(_commitThread, tx, syncCommit); + BDBCommitFutureResult commitFuture = new BDBCommitFutureResult(_commitThread, tx, syncCommit); commitFuture.commit(); return commitFuture; } - private static final class BDBCommitFuture implements StoreFuture + private static final class BDBCommitFutureResult implements FutureResult { - private static final Logger LOGGER = Logger.getLogger(BDBCommitFuture.class); + private static final Logger LOGGER = Logger.getLogger(BDBCommitFutureResult.class); private final CommitThread _commitThread; private final Transaction _tx; @@ -70,7 +71,7 @@ public class CommitThreadWrapper private boolean _complete; private boolean _syncCommit; - public BDBCommitFuture(CommitThread commitThread, Transaction tx, boolean syncCommit) + public BDBCommitFutureResult(CommitThread commitThread, Transaction tx, boolean syncCommit) { _commitThread = commitThread; _tx = tx; @@ -150,13 +151,48 @@ public class CommitThreadWrapper LOGGER.debug("waitForCompletion returning after " + duration + " ms for transaction " + _tx); } } + + @Override + public void waitForCompletion(final long timeout) throws TimeoutException + { + long startTime = System.currentTimeMillis(); + long remaining = timeout; + + while (!isComplete() && remaining > 0) + { + _commitThread.explicitNotify(); + try + { + wait(remaining); + } + catch (InterruptedException e) + { + throw new StoreException(e); + } + if(!isComplete()) + { + remaining = (startTime + timeout) - System.currentTimeMillis(); + } + } + + if(remaining < 0) + { + throw new TimeoutException("Commit did not complete within required timeout: " + timeout); + } + + if(LOGGER.isDebugEnabled()) + { + long duration = System.currentTimeMillis() - startTime; + LOGGER.debug("waitForCompletion returning after " + duration + " ms for transaction " + _tx); + } + } } /** - * Implements a thread which batches and commits a queue of {@link BDBCommitFuture} operations. The commit operations + * Implements a thread which batches and commits a queue of {@link org.apache.qpid.server.store.berkeleydb.CommitThreadWrapper.BDBCommitFutureResult} operations. The commit operations * themselves are responsible for adding themselves to the queue and waiting for the commit to happen before * continuing, but it is the responsibility of this thread to tell the commit operations when they have been - * completed by calling back on their {@link BDBCommitFuture#complete()} and {@link BDBCommitFuture#abort} methods. + * completed by calling back on their {@link org.apache.qpid.server.store.berkeleydb.CommitThreadWrapper.BDBCommitFutureResult#complete()} and {@link org.apache.qpid.server.store.berkeleydb.CommitThreadWrapper.BDBCommitFutureResult#abort} methods. * * <p/><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations </table> */ @@ -165,7 +201,7 @@ public class CommitThreadWrapper private static final Logger LOGGER = Logger.getLogger(CommitThread.class); private final AtomicBoolean _stopped = new AtomicBoolean(false); - private final Queue<BDBCommitFuture> _jobQueue = new ConcurrentLinkedQueue<BDBCommitFuture>(); + private final Queue<BDBCommitFutureResult> _jobQueue = new ConcurrentLinkedQueue<BDBCommitFutureResult>(); private final CheckpointConfig _config = new CheckpointConfig(); private final Object _lock = new Object(); private Environment _environment; @@ -230,7 +266,7 @@ public class CommitThreadWrapper for(int i = 0; i < size; i++) { - BDBCommitFuture commit = _jobQueue.poll(); + BDBCommitFutureResult commit = _jobQueue.poll(); commit.complete(); } @@ -243,7 +279,7 @@ public class CommitThreadWrapper for(int i = 0; i < size; i++) { - BDBCommitFuture commit = _jobQueue.poll(); + BDBCommitFutureResult commit = _jobQueue.poll(); commit.abort(e); } } @@ -268,7 +304,7 @@ public class CommitThreadWrapper return !_jobQueue.isEmpty(); } - public void addJob(BDBCommitFuture commit, final boolean sync) + public void addJob(BDBCommitFutureResult commit, final boolean sync) { _jobQueue.add(commit); |