/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ package org.apache.qpid.server.store.berkeleydb; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.log4j.Logger; import org.apache.qpid.server.store.StoreFuture; import com.sleepycat.je.CheckpointConfig; import com.sleepycat.je.DatabaseException; import com.sleepycat.je.Environment; import com.sleepycat.je.Transaction; public class CommitThreadWrapper { private final CommitThread _commitThread; public CommitThreadWrapper(String name, Environment env) { _commitThread = new CommitThread(name, env); } public void startCommitThread() { _commitThread.start(); } public void stopCommitThread() throws InterruptedException { _commitThread.close(); _commitThread.join(); } public StoreFuture commit(Transaction tx, boolean syncCommit) { BDBCommitFuture commitFuture = new BDBCommitFuture(_commitThread, tx, syncCommit); commitFuture.commit(); return commitFuture; } private static final class BDBCommitFuture implements StoreFuture { private static final Logger LOGGER = Logger.getLogger(BDBCommitFuture.class); private final CommitThread _commitThread; private final Transaction _tx; private DatabaseException _databaseException; private boolean _complete; private boolean _syncCommit; public BDBCommitFuture(CommitThread commitThread, Transaction tx, boolean syncCommit) { _commitThread = commitThread; _tx = tx; _syncCommit = syncCommit; } public synchronized void complete() { if (LOGGER.isDebugEnabled()) { LOGGER.debug("complete() called for transaction " + _tx); } _complete = true; notifyAll(); } public synchronized void abort(DatabaseException databaseException) { _complete = true; _databaseException = databaseException; notifyAll(); } public void commit() throws DatabaseException { _commitThread.addJob(this, _syncCommit); if(!_syncCommit) { if(LOGGER.isDebugEnabled()) { LOGGER.debug("CommitAsync was requested, returning immediately."); } return; } waitForCompletion(); if (_databaseException != null) { throw _databaseException; } } public synchronized boolean isComplete() { return _complete; } public synchronized void waitForCompletion() { long startTime = 0; if(LOGGER.isDebugEnabled()) { startTime = System.currentTimeMillis(); } while (!isComplete()) { _commitThread.explicitNotify(); try { wait(250); } catch (InterruptedException e) { throw new RuntimeException(e); } } if(LOGGER.isDebugEnabled()) { long duration = System.currentTimeMillis() - startTime; LOGGER.debug("waitForCompletion returning after " + duration + " ms for transaction " + _tx); } } } /** * Implements a thread which batches and commits a queue of {@link BDBCommitFuture} operations. The commit operations * themselves are responsible for adding themselves to the queue and waiting for the commit to happen before * continuing, but it is the responsibility of this thread to tell the commit operations when they have been * completed by calling back on their {@link BDBCommitFuture#complete()} and {@link BDBCommitFuture#abort} methods. * *
Responsibilities | Collaborations |
---|