diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2012-05-07 22:40:52 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2012-05-07 22:40:52 +0000 |
| commit | 3fbc2d6b53db6c3bfcd07ff191c80963c9259224 (patch) | |
| tree | 189297204dc500b70a4edc018fad4ef10dfeae4f /java/bdbstore/src | |
| parent | 173dbeeca9731f8a97e47cbd3b27235edadcdd82 (diff) | |
| download | qpid-python-3fbc2d6b53db6c3bfcd07ff191c80963c9259224.tar.gz | |
QPID-3986 : [Java Broker] Add producer flow control based on total disk usage
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1335290 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/bdbstore/src')
2 files changed, 147 insertions, 15 deletions
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java index fb1d7c5265..c60e9d14f2 100644 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.store.berkeleydb; import com.sleepycat.bind.tuple.ByteBinding; import com.sleepycat.bind.tuple.LongBinding; +import com.sleepycat.je.CheckpointConfig; import com.sleepycat.je.Cursor; import com.sleepycat.je.Database; import com.sleepycat.je.DatabaseConfig; @@ -58,6 +59,7 @@ import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecovery import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecoveryHandler; import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHandler; import org.apache.qpid.server.store.ConfiguredObjectHelper; +import org.apache.qpid.server.store.ConfiguredObjectRecord; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.Event; import org.apache.qpid.server.store.EventListener; @@ -74,7 +76,6 @@ import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TransactionLogRecoveryHandler; import org.apache.qpid.server.store.TransactionLogRecoveryHandler.QueueEntryRecoveryHandler; import org.apache.qpid.server.store.TransactionLogResource; -import org.apache.qpid.server.store.ConfiguredObjectRecord; import org.apache.qpid.server.store.berkeleydb.entry.PreparedTransaction; import org.apache.qpid.server.store.berkeleydb.entry.QueueEntryKey; import org.apache.qpid.server.store.berkeleydb.entry.Xid; @@ -97,6 +98,8 @@ public abstract class AbstractBDBMessageStore implements MessageStore public static final int VERSION = 6; public static final String ENVIRONMENT_PATH_PROPERTY = "environment-path"; + public static final String OVERFULL_SIZE_PROPERTY = "overfull-size"; + public static final String UNDERFULL_SIZE_PROPERTY = "underfull-size"; private Environment _environment; @@ -152,7 +155,12 @@ public abstract class AbstractBDBMessageStore implements MessageStore private TransactionLogRecoveryHandler _tlogRecoveryHandler; private ConfigurationRecoveryHandler _configRecoveryHandler; - + + private long _totalStoreSize; + private boolean _limitBusted; + private long _persistentSizeLowThreshold; + private long _persistentSizeHighThreshold; + private final EventManager _eventManager = new EventManager(); private String _storeLocation; @@ -163,6 +171,12 @@ public abstract class AbstractBDBMessageStore implements MessageStore _stateManager = new StateManager(_eventManager); } + @Override + public void addEventListener(EventListener eventListener, Event... events) + { + _eventManager.addEventListener(eventListener, events); + } + public void configureConfigStore(String name, ConfigurationRecoveryHandler recoveryHandler, Configuration storeConfiguration) throws Exception @@ -171,7 +185,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore _configRecoveryHandler = recoveryHandler; - configure(name,storeConfiguration); + configure(name, storeConfiguration); @@ -218,6 +232,13 @@ public abstract class AbstractBDBMessageStore implements MessageStore final String storeLocation = storeConfig.getString(ENVIRONMENT_PATH_PROPERTY, System.getProperty("QPID_WORK") + File.separator + "bdbstore" + File.separator + name); + _persistentSizeHighThreshold = storeConfig.getLong(OVERFULL_SIZE_PROPERTY, Long.MAX_VALUE); + _persistentSizeLowThreshold = storeConfig.getLong(UNDERFULL_SIZE_PROPERTY, _persistentSizeHighThreshold); + if(_persistentSizeLowThreshold > _persistentSizeHighThreshold || _persistentSizeLowThreshold < 0l) + { + _persistentSizeLowThreshold = _persistentSizeLowThreshold; + } + File environmentPath = new File(storeLocation); if (!environmentPath.exists()) { @@ -235,6 +256,12 @@ public abstract class AbstractBDBMessageStore implements MessageStore setupStore(environmentPath, name); } + @Override + public String getStoreLocation() + { + return _storeLocation; + } + /** * Move the store state from INITIAL to ACTIVE without actually recovering. * @@ -257,6 +284,8 @@ public abstract class AbstractBDBMessageStore implements MessageStore new Upgrader(_environment, name).upgradeIfNecessary(); openDatabases(); + + _totalStoreSize = getSizeOnDisk(); } protected Environment createEnvironment(File environmentPath) throws DatabaseException @@ -553,6 +582,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore StorableMessageMetaData metaData = valueBinding.entryToObject(value); StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, false); + mrh.message(message); maxId = Math.max(maxId, messageId); @@ -953,7 +983,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore UUIDTupleBinding.getInstance().objectToEntry(link.getId(), key); DatabaseEntry value = new DatabaseEntry(); - LongBinding.longToEntry(link.getCreateTime(),value); + LongBinding.longToEntry(link.getCreateTime(), value); StringMapBinding.getInstance().objectToEntry(link.getArguments(), value); try @@ -1247,7 +1277,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore /** * Primarily for testing purposes. * - * @param queueName + * @param queueId * * @return a list of message ids for messages enqueued for a particular queue */ @@ -1698,7 +1728,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore synchronized void store(com.sleepycat.je.Transaction txn) { - if(_metaData != null) + if(unstored()) { try { @@ -1728,14 +1758,19 @@ public abstract class AbstractBDBMessageStore implements MessageStore } } + private boolean unstored() + { + return _metaData != null; + } + public synchronized StoreFuture flushToStore() { - if(_metaData != null) + if(unstored()) { com.sleepycat.je.Transaction txn = _environment.beginTransaction(null, null); store(txn); AbstractBDBMessageStore.this.commit(txn,true); - + storedSizeChange(getMetaData().getContentSize()); } return StoreFuture.IMMEDIATE_FUTURE; } @@ -1744,7 +1779,10 @@ public abstract class AbstractBDBMessageStore implements MessageStore { try { + int delta = getMetaData().getContentSize(); AbstractBDBMessageStore.this.removeMessage(_messageId, false); + storedSizeChange(-delta); + } catch (AMQStoreException e) { @@ -1756,6 +1794,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore private class BDBTransaction implements org.apache.qpid.server.store.Transaction { private com.sleepycat.je.Transaction _txn; + private int _storeSizeIncrease; private BDBTransaction() { @@ -1773,7 +1812,9 @@ public abstract class AbstractBDBMessageStore implements MessageStore { if(message.getStoredMessage() instanceof StoredBDBMessage) { - ((StoredBDBMessage)message.getStoredMessage()).store(_txn); + final StoredBDBMessage storedMessage = (StoredBDBMessage) message.getStoredMessage(); + storedMessage.store(_txn); + _storeSizeIncrease += storedMessage.getMetaData().getContentSize(); } AbstractBDBMessageStore.this.enqueueMessage(_txn, queue, message.getMessageNumber()); @@ -1787,10 +1828,12 @@ public abstract class AbstractBDBMessageStore implements MessageStore public void commitTran() throws AMQStoreException { AbstractBDBMessageStore.this.commitTranImpl(_txn, true); + AbstractBDBMessageStore.this.storedSizeChange(_storeSizeIncrease); } public StoreFuture commitTranAsync() throws AMQStoreException { + AbstractBDBMessageStore.this.storedSizeChange(_storeSizeIncrease); return AbstractBDBMessageStore.this.commitTranImpl(_txn, false); } @@ -1811,15 +1854,84 @@ public abstract class AbstractBDBMessageStore implements MessageStore } } - @Override - public void addEventListener(EventListener eventListener, Event... events) + private void storedSizeChange(final int delta) { - _eventManager.addEventListener(eventListener, events); + if(getPersistentSizeHighThreshold() > 0) + { + synchronized (this) + { + // the delta supplied is an approximation of a store size change. we don;t want to check the statistic every + // time, so we do so only when there's been enough change that it is worth looking again. We do this by + // assuming the total size will change by less than twice the amount of the message data change. + long newSize = _totalStoreSize += 2*delta; + + if(!_limitBusted && newSize > getPersistentSizeHighThreshold()) + { + _totalStoreSize = getSizeOnDisk(); + + if(_totalStoreSize > getPersistentSizeHighThreshold()) + { + _limitBusted = true; + _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_OVERFULL); + } + } + else if(_limitBusted && newSize < getPersistentSizeLowThreshold()) + { + long oldSize = _totalStoreSize; + _totalStoreSize = getSizeOnDisk(); + + if(oldSize <= _totalStoreSize) + { + + reduceSizeOnDisk(); + + _totalStoreSize = getSizeOnDisk(); + + } + + if(_totalStoreSize < getPersistentSizeLowThreshold()) + { + _limitBusted = false; + _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL); + } + + + } + } + } } - @Override - public String getStoreLocation() + private void reduceSizeOnDisk() { - return _storeLocation; + _environment.getConfig().setConfigParam(EnvironmentConfig.ENV_RUN_CLEANER, "false"); + boolean cleaned = false; + while (_environment.cleanLog() > 0) + { + cleaned = true; + } + if (cleaned) + { + CheckpointConfig force = new CheckpointConfig(); + force.setForce(true); + _environment.checkpoint(force); + } + + + _environment.getConfig().setConfigParam(EnvironmentConfig.ENV_RUN_CLEANER, "true"); + } + + private long getSizeOnDisk() + { + return _environment.getStats(null).getTotalLogSize(); + } + + private long getPersistentSizeLowThreshold() + { + return _persistentSizeLowThreshold; + } + + private long getPersistentSizeHighThreshold() + { + return _persistentSizeHighThreshold; } } diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java index 9f7eb4bfd9..b414441b92 100644 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java @@ -21,12 +21,19 @@ package org.apache.qpid.server.store.berkeleydb; import java.io.File; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.log4j.Logger; import org.apache.qpid.AMQStoreException; +import org.apache.qpid.server.store.Event; +import org.apache.qpid.server.store.EventListener; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreFuture; @@ -49,6 +56,8 @@ public class BDBMessageStore extends AbstractBDBMessageStore private final CommitThread _commitThread = new CommitThread("Commit-Thread"); + private final Map<Event, List<EventListener>> _eventListeners = new HashMap<Event, List<EventListener>>(); + @Override protected void setupStore(File storePath, String name) throws DatabaseException, AMQStoreException { @@ -64,6 +73,17 @@ public class BDBMessageStore extends AbstractBDBMessageStore // This is what allows the creation of the store if it does not already exist. envConfig.setAllowCreate(true); envConfig.setTransactional(true); + + Properties props = System.getProperties(); + + for(String propName : props.stringPropertyNames()) + { + if(propName.startsWith("qpid.bdb.envconfig.je.")) + { + envConfig.setConfigParam(propName.substring(19), props.getProperty(propName)); + } + } + envConfig.setConfigParam("je.lock.nLockTables", "7"); // Added to help diagnosis of Deadlock issue |
