summaryrefslogtreecommitdiff
path: root/java/bdbstore/src
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2012-05-07 22:40:52 +0000
committerRobert Godfrey <rgodfrey@apache.org>2012-05-07 22:40:52 +0000
commit3fbc2d6b53db6c3bfcd07ff191c80963c9259224 (patch)
tree189297204dc500b70a4edc018fad4ef10dfeae4f /java/bdbstore/src
parent173dbeeca9731f8a97e47cbd3b27235edadcdd82 (diff)
downloadqpid-python-3fbc2d6b53db6c3bfcd07ff191c80963c9259224.tar.gz
QPID-3986 : [Java Broker] Add producer flow control based on total disk usage
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1335290 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/bdbstore/src')
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java142
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java20
2 files changed, 147 insertions, 15 deletions
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
index fb1d7c5265..c60e9d14f2 100644
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.store.berkeleydb;
import com.sleepycat.bind.tuple.ByteBinding;
import com.sleepycat.bind.tuple.LongBinding;
+import com.sleepycat.je.CheckpointConfig;
import com.sleepycat.je.Cursor;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
@@ -58,6 +59,7 @@ import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecovery
import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecoveryHandler;
import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHandler;
import org.apache.qpid.server.store.ConfiguredObjectHelper;
+import org.apache.qpid.server.store.ConfiguredObjectRecord;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.Event;
import org.apache.qpid.server.store.EventListener;
@@ -74,7 +76,6 @@ import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
import org.apache.qpid.server.store.TransactionLogRecoveryHandler.QueueEntryRecoveryHandler;
import org.apache.qpid.server.store.TransactionLogResource;
-import org.apache.qpid.server.store.ConfiguredObjectRecord;
import org.apache.qpid.server.store.berkeleydb.entry.PreparedTransaction;
import org.apache.qpid.server.store.berkeleydb.entry.QueueEntryKey;
import org.apache.qpid.server.store.berkeleydb.entry.Xid;
@@ -97,6 +98,8 @@ public abstract class AbstractBDBMessageStore implements MessageStore
public static final int VERSION = 6;
public static final String ENVIRONMENT_PATH_PROPERTY = "environment-path";
+ public static final String OVERFULL_SIZE_PROPERTY = "overfull-size";
+ public static final String UNDERFULL_SIZE_PROPERTY = "underfull-size";
private Environment _environment;
@@ -152,7 +155,12 @@ public abstract class AbstractBDBMessageStore implements MessageStore
private TransactionLogRecoveryHandler _tlogRecoveryHandler;
private ConfigurationRecoveryHandler _configRecoveryHandler;
-
+
+ private long _totalStoreSize;
+ private boolean _limitBusted;
+ private long _persistentSizeLowThreshold;
+ private long _persistentSizeHighThreshold;
+
private final EventManager _eventManager = new EventManager();
private String _storeLocation;
@@ -163,6 +171,12 @@ public abstract class AbstractBDBMessageStore implements MessageStore
_stateManager = new StateManager(_eventManager);
}
+ @Override
+ public void addEventListener(EventListener eventListener, Event... events)
+ {
+ _eventManager.addEventListener(eventListener, events);
+ }
+
public void configureConfigStore(String name,
ConfigurationRecoveryHandler recoveryHandler,
Configuration storeConfiguration) throws Exception
@@ -171,7 +185,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
_configRecoveryHandler = recoveryHandler;
- configure(name,storeConfiguration);
+ configure(name, storeConfiguration);
@@ -218,6 +232,13 @@ public abstract class AbstractBDBMessageStore implements MessageStore
final String storeLocation = storeConfig.getString(ENVIRONMENT_PATH_PROPERTY,
System.getProperty("QPID_WORK") + File.separator + "bdbstore" + File.separator + name);
+ _persistentSizeHighThreshold = storeConfig.getLong(OVERFULL_SIZE_PROPERTY, Long.MAX_VALUE);
+ _persistentSizeLowThreshold = storeConfig.getLong(UNDERFULL_SIZE_PROPERTY, _persistentSizeHighThreshold);
+ if(_persistentSizeLowThreshold > _persistentSizeHighThreshold || _persistentSizeLowThreshold < 0l)
+ {
+ _persistentSizeLowThreshold = _persistentSizeLowThreshold;
+ }
+
File environmentPath = new File(storeLocation);
if (!environmentPath.exists())
{
@@ -235,6 +256,12 @@ public abstract class AbstractBDBMessageStore implements MessageStore
setupStore(environmentPath, name);
}
+ @Override
+ public String getStoreLocation()
+ {
+ return _storeLocation;
+ }
+
/**
* Move the store state from INITIAL to ACTIVE without actually recovering.
*
@@ -257,6 +284,8 @@ public abstract class AbstractBDBMessageStore implements MessageStore
new Upgrader(_environment, name).upgradeIfNecessary();
openDatabases();
+
+ _totalStoreSize = getSizeOnDisk();
}
protected Environment createEnvironment(File environmentPath) throws DatabaseException
@@ -553,6 +582,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
StorableMessageMetaData metaData = valueBinding.entryToObject(value);
StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, false);
+
mrh.message(message);
maxId = Math.max(maxId, messageId);
@@ -953,7 +983,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
UUIDTupleBinding.getInstance().objectToEntry(link.getId(), key);
DatabaseEntry value = new DatabaseEntry();
- LongBinding.longToEntry(link.getCreateTime(),value);
+ LongBinding.longToEntry(link.getCreateTime(), value);
StringMapBinding.getInstance().objectToEntry(link.getArguments(), value);
try
@@ -1247,7 +1277,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
/**
* Primarily for testing purposes.
*
- * @param queueName
+ * @param queueId
*
* @return a list of message ids for messages enqueued for a particular queue
*/
@@ -1698,7 +1728,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
synchronized void store(com.sleepycat.je.Transaction txn)
{
- if(_metaData != null)
+ if(unstored())
{
try
{
@@ -1728,14 +1758,19 @@ public abstract class AbstractBDBMessageStore implements MessageStore
}
}
+ private boolean unstored()
+ {
+ return _metaData != null;
+ }
+
public synchronized StoreFuture flushToStore()
{
- if(_metaData != null)
+ if(unstored())
{
com.sleepycat.je.Transaction txn = _environment.beginTransaction(null, null);
store(txn);
AbstractBDBMessageStore.this.commit(txn,true);
-
+ storedSizeChange(getMetaData().getContentSize());
}
return StoreFuture.IMMEDIATE_FUTURE;
}
@@ -1744,7 +1779,10 @@ public abstract class AbstractBDBMessageStore implements MessageStore
{
try
{
+ int delta = getMetaData().getContentSize();
AbstractBDBMessageStore.this.removeMessage(_messageId, false);
+ storedSizeChange(-delta);
+
}
catch (AMQStoreException e)
{
@@ -1756,6 +1794,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
private class BDBTransaction implements org.apache.qpid.server.store.Transaction
{
private com.sleepycat.je.Transaction _txn;
+ private int _storeSizeIncrease;
private BDBTransaction()
{
@@ -1773,7 +1812,9 @@ public abstract class AbstractBDBMessageStore implements MessageStore
{
if(message.getStoredMessage() instanceof StoredBDBMessage)
{
- ((StoredBDBMessage)message.getStoredMessage()).store(_txn);
+ final StoredBDBMessage storedMessage = (StoredBDBMessage) message.getStoredMessage();
+ storedMessage.store(_txn);
+ _storeSizeIncrease += storedMessage.getMetaData().getContentSize();
}
AbstractBDBMessageStore.this.enqueueMessage(_txn, queue, message.getMessageNumber());
@@ -1787,10 +1828,12 @@ public abstract class AbstractBDBMessageStore implements MessageStore
public void commitTran() throws AMQStoreException
{
AbstractBDBMessageStore.this.commitTranImpl(_txn, true);
+ AbstractBDBMessageStore.this.storedSizeChange(_storeSizeIncrease);
}
public StoreFuture commitTranAsync() throws AMQStoreException
{
+ AbstractBDBMessageStore.this.storedSizeChange(_storeSizeIncrease);
return AbstractBDBMessageStore.this.commitTranImpl(_txn, false);
}
@@ -1811,15 +1854,84 @@ public abstract class AbstractBDBMessageStore implements MessageStore
}
}
- @Override
- public void addEventListener(EventListener eventListener, Event... events)
+ private void storedSizeChange(final int delta)
{
- _eventManager.addEventListener(eventListener, events);
+ if(getPersistentSizeHighThreshold() > 0)
+ {
+ synchronized (this)
+ {
+ // the delta supplied is an approximation of a store size change. we don;t want to check the statistic every
+ // time, so we do so only when there's been enough change that it is worth looking again. We do this by
+ // assuming the total size will change by less than twice the amount of the message data change.
+ long newSize = _totalStoreSize += 2*delta;
+
+ if(!_limitBusted && newSize > getPersistentSizeHighThreshold())
+ {
+ _totalStoreSize = getSizeOnDisk();
+
+ if(_totalStoreSize > getPersistentSizeHighThreshold())
+ {
+ _limitBusted = true;
+ _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_OVERFULL);
+ }
+ }
+ else if(_limitBusted && newSize < getPersistentSizeLowThreshold())
+ {
+ long oldSize = _totalStoreSize;
+ _totalStoreSize = getSizeOnDisk();
+
+ if(oldSize <= _totalStoreSize)
+ {
+
+ reduceSizeOnDisk();
+
+ _totalStoreSize = getSizeOnDisk();
+
+ }
+
+ if(_totalStoreSize < getPersistentSizeLowThreshold())
+ {
+ _limitBusted = false;
+ _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL);
+ }
+
+
+ }
+ }
+ }
}
- @Override
- public String getStoreLocation()
+ private void reduceSizeOnDisk()
{
- return _storeLocation;
+ _environment.getConfig().setConfigParam(EnvironmentConfig.ENV_RUN_CLEANER, "false");
+ boolean cleaned = false;
+ while (_environment.cleanLog() > 0)
+ {
+ cleaned = true;
+ }
+ if (cleaned)
+ {
+ CheckpointConfig force = new CheckpointConfig();
+ force.setForce(true);
+ _environment.checkpoint(force);
+ }
+
+
+ _environment.getConfig().setConfigParam(EnvironmentConfig.ENV_RUN_CLEANER, "true");
+ }
+
+ private long getSizeOnDisk()
+ {
+ return _environment.getStats(null).getTotalLogSize();
+ }
+
+ private long getPersistentSizeLowThreshold()
+ {
+ return _persistentSizeLowThreshold;
+ }
+
+ private long getPersistentSizeHighThreshold()
+ {
+ return _persistentSizeHighThreshold;
}
}
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
index 9f7eb4bfd9..b414441b92 100644
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
+++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
@@ -21,12 +21,19 @@
package org.apache.qpid.server.store.berkeleydb;
import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.server.store.Event;
+import org.apache.qpid.server.store.EventListener;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreFuture;
@@ -49,6 +56,8 @@ public class BDBMessageStore extends AbstractBDBMessageStore
private final CommitThread _commitThread = new CommitThread("Commit-Thread");
+ private final Map<Event, List<EventListener>> _eventListeners = new HashMap<Event, List<EventListener>>();
+
@Override
protected void setupStore(File storePath, String name) throws DatabaseException, AMQStoreException
{
@@ -64,6 +73,17 @@ public class BDBMessageStore extends AbstractBDBMessageStore
// This is what allows the creation of the store if it does not already exist.
envConfig.setAllowCreate(true);
envConfig.setTransactional(true);
+
+ Properties props = System.getProperties();
+
+ for(String propName : props.stringPropertyNames())
+ {
+ if(propName.startsWith("qpid.bdb.envconfig.je."))
+ {
+ envConfig.setConfigParam(propName.substring(19), props.getProperty(propName));
+ }
+ }
+
envConfig.setConfigParam("je.lock.nLockTables", "7");
// Added to help diagnosis of Deadlock issue