diff options
Diffstat (limited to 'java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java')
-rw-r--r-- | java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java | 234 |
1 files changed, 16 insertions, 218 deletions
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java index 9f7eb4bfd9..82bc3d8564 100644 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java @@ -21,16 +21,12 @@ package org.apache.qpid.server.store.berkeleydb; import java.io.File; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicBoolean; import org.apache.log4j.Logger; import org.apache.qpid.AMQStoreException; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreFuture; -import com.sleepycat.je.CheckpointConfig; import com.sleepycat.je.DatabaseException; import com.sleepycat.je.Environment; import com.sleepycat.je.EnvironmentConfig; @@ -46,39 +42,23 @@ import com.sleepycat.je.EnvironmentConfig; public class BDBMessageStore extends AbstractBDBMessageStore { private static final Logger LOGGER = Logger.getLogger(BDBMessageStore.class); - - private final CommitThread _commitThread = new CommitThread("Commit-Thread"); + private static final String BDB_STORE_TYPE = "BDB"; + private CommitThreadWrapper _commitThreadWrapper; @Override protected void setupStore(File storePath, String name) throws DatabaseException, AMQStoreException { super.setupStore(storePath, name); - startCommitThread(); + _commitThreadWrapper = new CommitThreadWrapper("Commit-Thread-" + name, getEnvironment()); + _commitThreadWrapper.startCommitThread(); } protected Environment createEnvironment(File environmentPath) throws DatabaseException { LOGGER.info("BDB message store using environment path " + environmentPath.getAbsolutePath()); - EnvironmentConfig envConfig = new EnvironmentConfig(); - // This is what allows the creation of the store if it does not already exist. - envConfig.setAllowCreate(true); - envConfig.setTransactional(true); - envConfig.setConfigParam("je.lock.nLockTables", "7"); - - // Added to help diagnosis of Deadlock issue - // http://www.oracle.com/technology/products/berkeley-db/faq/je_faq.html#23 - if (Boolean.getBoolean("qpid.bdb.lock.debug")) - { - envConfig.setConfigParam("je.txn.deadlockStackTrace", "true"); - envConfig.setConfigParam("je.txn.dumpLocks", "true"); - } - - // Set transaction mode - _transactionConfig.setReadCommitted(true); + EnvironmentConfig envConfig = createEnvironmentConfig(); - //This prevents background threads running which will potentially update the store. - envConfig.setReadOnly(false); try { return new Environment(environmentPath, envConfig); @@ -98,12 +78,10 @@ public class BDBMessageStore extends AbstractBDBMessageStore } } - - @Override protected void closeInternal() throws Exception { - stopCommitThread(); + _commitThreadWrapper.stopCommitThread(); super.closeInternal(); } @@ -111,206 +89,26 @@ public class BDBMessageStore extends AbstractBDBMessageStore @Override protected StoreFuture commit(com.sleepycat.je.Transaction tx, boolean syncCommit) throws DatabaseException { - tx.commitNoSync(); - - BDBCommitFuture commitFuture = new BDBCommitFuture(_commitThread, tx, syncCommit); - commitFuture.commit(); - - return commitFuture; - } - - private void startCommitThread() - { - _commitThread.start(); - } - - private void stopCommitThread() throws InterruptedException - { - _commitThread.close(); - _commitThread.join(); - } - - private static final class BDBCommitFuture implements StoreFuture - { - private final CommitThread _commitThread; - private final com.sleepycat.je.Transaction _tx; - private DatabaseException _databaseException; - private boolean _complete; - private boolean _syncCommit; - - public BDBCommitFuture(CommitThread commitThread, com.sleepycat.je.Transaction tx, boolean syncCommit) - { - _commitThread = commitThread; - _tx = tx; - _syncCommit = syncCommit; - } - - public synchronized void complete() + try { - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("public synchronized void complete(): called (Transaction = " + _tx + ")"); - } - _complete = true; - - notifyAll(); + tx.commitNoSync(); } - - public synchronized void abort(DatabaseException databaseException) + catch(DatabaseException de) { - _complete = true; - _databaseException = databaseException; + LOGGER.error("Got DatabaseException on commit, closing environment", de); - notifyAll(); - } - - public void commit() throws DatabaseException - { - _commitThread.addJob(this, _syncCommit); - - if(!_syncCommit) - { - LOGGER.debug("CommitAsync was requested, returning immediately."); - return; - } - - waitForCompletion(); - - if (_databaseException != null) - { - throw _databaseException; - } + closeEnvironmentSafely(); + throw de; } - public synchronized boolean isComplete() - { - return _complete; - } - - public synchronized void waitForCompletion() - { - while (!isComplete()) - { - _commitThread.explicitNotify(); - try - { - wait(250); - } - catch (InterruptedException e) - { - //TODO Should we ignore, or throw a 'StoreException'? - throw new RuntimeException(e); - } - } - } + return _commitThreadWrapper.commit(tx, syncCommit); } - /** - * Implements a thread which batches and commits a queue of {@link BDBCommitFuture} operations. The commit operations - * themselves are responsible for adding themselves to the queue and waiting for the commit to happen before - * continuing, but it is the responsibility of this thread to tell the commit operations when they have been - * completed by calling back on their {@link BDBCommitFuture#complete()} and {@link BDBCommitFuture#abort} methods. - * - * <p/><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations </table> - */ - private class CommitThread extends Thread + @Override + public String getStoreType() { - private final AtomicBoolean _stopped = new AtomicBoolean(false); - private final Queue<BDBCommitFuture> _jobQueue = new ConcurrentLinkedQueue<BDBCommitFuture>(); - private final CheckpointConfig _config = new CheckpointConfig(); - private final Object _lock = new Object(); - - public CommitThread(String name) - { - super(name); - _config.setForce(true); - - } - - public void explicitNotify() - { - synchronized (_lock) - { - _lock.notify(); - } - } - - public void run() - { - while (!_stopped.get()) - { - synchronized (_lock) - { - while (!_stopped.get() && !hasJobs()) - { - try - { - // RHM-7 Periodically wake up and check, just in case we - // missed a notification. Don't want to lock the broker hard. - _lock.wait(1000); - } - catch (InterruptedException e) - { - } - } - } - processJobs(); - } - } - - private void processJobs() - { - int size = _jobQueue.size(); - - try - { - getEnvironment().flushLog(true); - - for(int i = 0; i < size; i++) - { - BDBCommitFuture commit = _jobQueue.poll(); - commit.complete(); - } - - } - catch (DatabaseException e) - { - for(int i = 0; i < size; i++) - { - BDBCommitFuture commit = _jobQueue.poll(); - commit.abort(e); - } - } - - } - - private boolean hasJobs() - { - return !_jobQueue.isEmpty(); - } - - public void addJob(BDBCommitFuture commit, final boolean sync) - { - - _jobQueue.add(commit); - if(sync) - { - synchronized (_lock) - { - _lock.notifyAll(); - } - } - } - - public void close() - { - synchronized (_lock) - { - _stopped.set(true); - _lock.notifyAll(); - } - } + return BDB_STORE_TYPE; } } |