diff options
Diffstat (limited to 'qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java')
-rw-r--r-- | qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java | 61 |
1 files changed, 48 insertions, 13 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java index 964335869d..2a8cf92b3d 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.store.berkeleydb; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import com.sleepycat.je.DatabaseException; @@ -29,7 +30,7 @@ import com.sleepycat.je.Environment; import com.sleepycat.je.Transaction; import org.apache.log4j.Logger; -import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.util.FutureResult; public class CoalescingCommiter implements Committer { @@ -65,16 +66,16 @@ public class CoalescingCommiter implements Committer } @Override - public StoreFuture commit(Transaction tx, boolean syncCommit) + public FutureResult commit(Transaction tx, boolean syncCommit) { - BDBCommitFuture commitFuture = new BDBCommitFuture(_commitThread, tx, syncCommit); + BDBCommitFutureResult commitFuture = new BDBCommitFutureResult(_commitThread, tx, syncCommit); commitFuture.commit(); return commitFuture; } - private static final class BDBCommitFuture implements StoreFuture + private static final class BDBCommitFutureResult implements FutureResult { - private static final Logger LOGGER = Logger.getLogger(BDBCommitFuture.class); + private static final Logger LOGGER = Logger.getLogger(BDBCommitFutureResult.class); private final CommitThread _commitThread; private final Transaction _tx; @@ -82,7 +83,7 @@ public class CoalescingCommiter implements Committer private RuntimeException _databaseException; private boolean _complete; - public BDBCommitFuture(CommitThread commitThread, Transaction tx, boolean syncCommit) + public BDBCommitFutureResult(CommitThread commitThread, Transaction tx, boolean syncCommit) { _commitThread = commitThread; _tx = tx; @@ -162,13 +163,47 @@ public class CoalescingCommiter implements Committer LOGGER.debug("waitForCompletion returning after " + duration + " ms for transaction " + _tx); } } + + public synchronized void waitForCompletion(long timeout) throws TimeoutException + { + long startTime= System.currentTimeMillis(); + long remaining = timeout; + + while (!isComplete() && remaining > 0) + { + _commitThread.explicitNotify(); + try + { + wait(remaining); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + if(!isComplete()) + { + remaining = (startTime + timeout) - System.currentTimeMillis(); + } + } + + if(remaining < 0l) + { + throw new TimeoutException("commit did not occur within given timeout period: " + timeout); + } + + if(LOGGER.isDebugEnabled()) + { + long duration = System.currentTimeMillis() - startTime; + LOGGER.debug("waitForCompletion returning after " + duration + " ms for transaction " + _tx); + } + } } /** - * Implements a thread which batches and commits a queue of {@link BDBCommitFuture} operations. The commit operations + * Implements a thread which batches and commits a queue of {@link org.apache.qpid.server.store.berkeleydb.CoalescingCommiter.BDBCommitFutureResult} operations. The commit operations * themselves are responsible for adding themselves to the queue and waiting for the commit to happen before * continuing, but it is the responsibility of this thread to tell the commit operations when they have been - * completed by calling back on their {@link BDBCommitFuture#complete()} and {@link BDBCommitFuture#abort} methods. + * completed by calling back on their {@link org.apache.qpid.server.store.berkeleydb.CoalescingCommiter.BDBCommitFutureResult#complete()} and {@link org.apache.qpid.server.store.berkeleydb.CoalescingCommiter.BDBCommitFutureResult#abort} methods. * * <p/><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations </table> */ @@ -177,7 +212,7 @@ public class CoalescingCommiter implements Committer private static final Logger LOGGER = Logger.getLogger(CommitThread.class); private final AtomicBoolean _stopped = new AtomicBoolean(false); - private final Queue<BDBCommitFuture> _jobQueue = new ConcurrentLinkedQueue<BDBCommitFuture>(); + private final Queue<BDBCommitFutureResult> _jobQueue = new ConcurrentLinkedQueue<BDBCommitFutureResult>(); private final Object _lock = new Object(); private final EnvironmentFacade _environmentFacade; @@ -244,7 +279,7 @@ public class CoalescingCommiter implements Committer for(int i = 0; i < size; i++) { - BDBCommitFuture commit = _jobQueue.poll(); + BDBCommitFutureResult commit = _jobQueue.poll(); if (commit == null) { break; @@ -261,7 +296,7 @@ public class CoalescingCommiter implements Committer for(int i = 0; i < size; i++) { - BDBCommitFuture commit = _jobQueue.poll(); + BDBCommitFutureResult commit = _jobQueue.poll(); if (commit == null) { break; @@ -290,7 +325,7 @@ public class CoalescingCommiter implements Committer return !_jobQueue.isEmpty(); } - public void addJob(BDBCommitFuture commit, final boolean sync) + public void addJob(BDBCommitFutureResult commit, final boolean sync) { if (_stopped.get()) { @@ -313,7 +348,7 @@ public class CoalescingCommiter implements Committer { _stopped.set(true); Environment environment = _environmentFacade.getEnvironment(); - BDBCommitFuture commit; + BDBCommitFutureResult commit; if (environment != null && environment.isValid()) { environment.flushLog(true); |