summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java326
1 files changed, 278 insertions, 48 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
index 0371cdcfcb..154d7e6535 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
@@ -31,6 +31,7 @@ import java.lang.ref.SoftReference;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.sql.Blob;
+import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.DriverManager;
@@ -59,6 +60,7 @@ import org.apache.qpid.server.federation.BrokerLink;
import org.apache.qpid.server.message.EnqueableMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
+import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler;
import org.apache.qpid.server.store.ConfiguredObjectHelper;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
import org.apache.qpid.server.store.Event;
@@ -77,6 +79,9 @@ import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.Transaction;
import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecoveryHandler;
+import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecoveryHandler;
+import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHandler;
/**
* An implementation of a {@link MessageStore} that uses Apache Derby as the persistence
@@ -111,6 +116,7 @@ public class DerbyMessageStore implements MessageStore
private static Class<Driver> DRIVER_CLASS;
+ public static final String MEMORY_STORE_LOCATION = ":memory:";
private final AtomicLong _messageId = new AtomicLong(0);
private AtomicBoolean _closed = new AtomicBoolean(false);
@@ -230,10 +236,17 @@ public class DerbyMessageStore implements MessageStore
private static final String DERBY_SINGLE_DB_SHUTDOWN_CODE = "08006";
+ private static final String DERBY_STORE_TYPE = "DERBY";
+
private final StateManager _stateManager;
-
+
private final EventManager _eventManager = new EventManager();
+ private long _totalStoreSize;
+ private boolean _limitBusted;
+ private long _persistentSizeLowThreshold;
+ private long _persistentSizeHighThreshold;
+
private MessageStoreRecoveryHandler _messageRecoveryHandler;
private TransactionLogRecoveryHandler _tlogRecoveryHandler;
@@ -253,7 +266,7 @@ public class DerbyMessageStore implements MessageStore
ConfigurationRecoveryHandler configRecoveryHandler,
Configuration storeConfiguration) throws Exception
{
- _stateManager.attainState(State.CONFIGURING);
+ _stateManager.attainState(State.INITIALISING);
_configRecoveryHandler = configRecoveryHandler;
commonConfiguration(name, storeConfiguration);
@@ -269,13 +282,13 @@ public class DerbyMessageStore implements MessageStore
_tlogRecoveryHandler = tlogRecoveryHandler;
_messageRecoveryHandler = recoveryHandler;
- _stateManager.attainState(State.CONFIGURED);
+ _stateManager.attainState(State.INITIALISED);
}
@Override
public void activate() throws Exception
{
- _stateManager.attainState(State.RECOVERING);
+ _stateManager.attainState(State.ACTIVATING);
// this recovers durable exchanges, queues, and bindings
recoverConfiguration(_configRecoveryHandler);
@@ -296,19 +309,39 @@ public class DerbyMessageStore implements MessageStore
final String databasePath = storeConfiguration.getString(MessageStoreConstants.ENVIRONMENT_PATH_PROPERTY, System.getProperty("QPID_WORK")
+ File.separator + "derbyDB");
- File environmentPath = new File(databasePath);
- if (!environmentPath.exists())
+ if(!MEMORY_STORE_LOCATION.equals(databasePath))
{
- if (!environmentPath.mkdirs())
+ File environmentPath = new File(databasePath);
+ if (!environmentPath.exists())
{
- throw new IllegalArgumentException("Environment path " + environmentPath + " could not be read or created. "
- + "Ensure the path is correct and that the permissions are correct.");
+ if (!environmentPath.mkdirs())
+ {
+ throw new IllegalArgumentException("Environment path " + environmentPath + " could not be read or created. "
+ + "Ensure the path is correct and that the permissions are correct.");
+ }
}
}
_storeLocation = databasePath;
+ _persistentSizeHighThreshold = storeConfiguration.getLong(MessageStoreConstants.OVERFULL_SIZE_PROPERTY, -1l);
+ _persistentSizeLowThreshold = storeConfiguration.getLong(MessageStoreConstants.UNDERFULL_SIZE_PROPERTY, _persistentSizeHighThreshold);
+ if(_persistentSizeLowThreshold > _persistentSizeHighThreshold || _persistentSizeLowThreshold < 0l)
+ {
+ _persistentSizeLowThreshold = _persistentSizeHighThreshold;
+ }
+
createOrOpenDatabase(name, databasePath);
+
+ Connection conn = newAutoCommitConnection();;
+ try
+ {
+ _totalStoreSize = getSizeOnDisk(conn);
+ }
+ finally
+ {
+ conn.close();
+ }
}
private static synchronized void initialiseDriver() throws ClassNotFoundException
@@ -322,7 +355,7 @@ public class DerbyMessageStore implements MessageStore
private void createOrOpenDatabase(String name, final String environmentPath) throws SQLException
{
//FIXME this the _vhost name should not be added here, but derby wont use an empty directory as was possibly just created.
- _connectionURL = "jdbc:derby:" + environmentPath + "/" + name + ";create=true";
+ _connectionURL = "jdbc:derby" + (environmentPath.equals(MEMORY_STORE_LOCATION) ? environmentPath : ":" + environmentPath + "/") + name + ";create=true";
Connection conn = newAutoCommitConnection();
@@ -529,16 +562,17 @@ public class DerbyMessageStore implements MessageStore
try
{
List<ConfiguredObjectRecord> configuredObjects = loadConfiguredObjects();
- ConfigurationRecoveryHandler.QueueRecoveryHandler qrh = recoveryHandler.begin(this);
- _configuredObjectHelper.recoverQueues(qrh, configuredObjects);
- ConfigurationRecoveryHandler.ExchangeRecoveryHandler erh = qrh.completeQueueRecovery();
+ ExchangeRecoveryHandler erh = recoveryHandler.begin(this);
_configuredObjectHelper.recoverExchanges(erh, configuredObjects);
- ConfigurationRecoveryHandler.BindingRecoveryHandler brh = erh.completeExchangeRecovery();
+ QueueRecoveryHandler qrh = erh.completeExchangeRecovery();
+ _configuredObjectHelper.recoverQueues(qrh, configuredObjects);
+
+ BindingRecoveryHandler brh = qrh.completeQueueRecovery();
_configuredObjectHelper.recoverBindings(brh, configuredObjects);
- ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler lrh = brh.completeBindingRecovery();
+ BrokerLinkRecoveryHandler lrh = brh.completeBindingRecovery();
recoverBrokerLinks(lrh);
}
catch (SQLException e)
@@ -689,7 +723,7 @@ public class DerbyMessageStore implements MessageStore
public void close() throws Exception
{
_closed.getAndSet(true);
- _stateManager.stateTransition(State.ACTIVE, State.CLOSING);
+ _stateManager.attainState(State.CLOSING);
try
{
@@ -710,7 +744,7 @@ public class DerbyMessageStore implements MessageStore
}
}
- _stateManager.stateTransition(State.CLOSING, State.CLOSED);
+ _stateManager.attainState(State.CLOSED);
}
@Override
@@ -956,8 +990,8 @@ public class DerbyMessageStore implements MessageStore
try
{
- stmt.setLong(1, link.getId().getLeastSignificantBits());
- stmt.setLong(2, link.getId().getMostSignificantBits());
+ stmt.setLong(1, link.getQMFId().getLeastSignificantBits());
+ stmt.setLong(2, link.getQMFId().getMostSignificantBits());
ResultSet rs = stmt.executeQuery();
try
{
@@ -970,8 +1004,8 @@ public class DerbyMessageStore implements MessageStore
try
{
- insertStmt.setLong(1, link.getId().getLeastSignificantBits());
- insertStmt.setLong(2, link.getId().getMostSignificantBits());
+ insertStmt.setLong(1, link.getQMFId().getLeastSignificantBits());
+ insertStmt.setLong(2, link.getQMFId().getMostSignificantBits());
insertStmt.setLong(3, link.getCreateTime());
byte[] argumentBytes = convertStringMapToBytes(link.getArguments());
@@ -1048,8 +1082,8 @@ public class DerbyMessageStore implements MessageStore
{
conn = newAutoCommitConnection();
stmt = conn.prepareStatement(DELETE_FROM_LINKS);
- stmt.setLong(1, link.getId().getLeastSignificantBits());
- stmt.setLong(2, link.getId().getMostSignificantBits());
+ stmt.setLong(1, link.getQMFId().getLeastSignificantBits());
+ stmt.setLong(2, link.getQMFId().getMostSignificantBits());
int results = stmt.executeUpdate();
if (results == 0)
@@ -1085,7 +1119,7 @@ public class DerbyMessageStore implements MessageStore
try
{
- UUID id = bridge.getId();
+ UUID id = bridge.getQMFId();
stmt.setLong(1, id.getLeastSignificantBits());
stmt.setLong(2, id.getMostSignificantBits());
ResultSet rs = stmt.executeQuery();
@@ -1105,7 +1139,7 @@ public class DerbyMessageStore implements MessageStore
insertStmt.setLong(3, bridge.getCreateTime());
- UUID linkId = bridge.getLink().getId();
+ UUID linkId = bridge.getLink().getQMFId();
insertStmt.setLong(4, linkId.getLeastSignificantBits());
insertStmt.setLong(5, linkId.getMostSignificantBits());
@@ -1151,8 +1185,8 @@ public class DerbyMessageStore implements MessageStore
{
conn = newAutoCommitConnection();
stmt = conn.prepareStatement(DELETE_FROM_BRIDGES);
- stmt.setLong(1, bridge.getId().getLeastSignificantBits());
- stmt.setLong(2, bridge.getId().getMostSignificantBits());
+ stmt.setLong(1, bridge.getQMFId().getLeastSignificantBits());
+ stmt.setLong(2, bridge.getQMFId().getMostSignificantBits());
int results = stmt.executeUpdate();
if (results == 0)
@@ -1541,7 +1575,7 @@ public class DerbyMessageStore implements MessageStore
buf = buf.slice();
MessageMetaDataType type = MessageMetaDataType.values()[dataAsBytes[0]];
StorableMessageMetaData metaData = type.getFactory().createMetaData(buf);
- StoredDerbyMessage message = new StoredDerbyMessage(messageId, metaData, false);
+ StoredDerbyMessage message = new StoredDerbyMessage(messageId, metaData, true);
messageHandler.message(message);
}
@@ -1921,6 +1955,7 @@ public class DerbyMessageStore implements MessageStore
private class DerbyTransaction implements Transaction
{
private final ConnectionWrapper _connWrapper;
+ private int _storeSizeIncrease;
private DerbyTransaction()
@@ -1938,18 +1973,19 @@ public class DerbyMessageStore implements MessageStore
@Override
public void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
{
- if(message.getStoredMessage() instanceof StoredDerbyMessage)
+ final StoredMessage storedMessage = message.getStoredMessage();
+ if(storedMessage instanceof StoredDerbyMessage)
{
try
{
- ((StoredDerbyMessage)message.getStoredMessage()).store(_connWrapper.getConnection());
+ ((StoredDerbyMessage) storedMessage).store(_connWrapper.getConnection());
}
catch (SQLException e)
{
throw new AMQStoreException("Exception on enqueuing message " + _messageId, e);
}
}
-
+ _storeSizeIncrease += storedMessage.getMetaData().getContentSize();
DerbyMessageStore.this.enqueueMessage(_connWrapper, queue, message.getMessageNumber());
}
@@ -1964,12 +2000,15 @@ public class DerbyMessageStore implements MessageStore
public void commitTran() throws AMQStoreException
{
DerbyMessageStore.this.commitTran(_connWrapper);
+ storedSizeChange(_storeSizeIncrease);
}
@Override
public StoreFuture commitTranAsync() throws AMQStoreException
{
- return DerbyMessageStore.this.commitTranAsync(_connWrapper);
+ final StoreFuture storeFuture = DerbyMessageStore.this.commitTranAsync(_connWrapper);
+ storedSizeChange(_storeSizeIncrease);
+ return storeFuture;
}
@Override
@@ -1998,6 +2037,8 @@ public class DerbyMessageStore implements MessageStore
{
private final long _messageId;
+ private final boolean _isRecovered;
+
private StorableMessageMetaData _metaData;
private volatile SoftReference<StorableMessageMetaData> _metaDataRef;
private byte[] _data;
@@ -2006,21 +2047,21 @@ public class DerbyMessageStore implements MessageStore
StoredDerbyMessage(long messageId, StorableMessageMetaData metaData)
{
- this(messageId, metaData, true);
+ this(messageId, metaData, false);
}
StoredDerbyMessage(long messageId,
- StorableMessageMetaData metaData, boolean persist)
+ StorableMessageMetaData metaData, boolean isRecovered)
{
_messageId = messageId;
+ _isRecovered = isRecovered;
-
- _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
- if(persist)
+ if(!_isRecovered)
{
_metaData = metaData;
}
+ _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
}
@Override
@@ -2101,16 +2142,17 @@ public class DerbyMessageStore implements MessageStore
@Override
public synchronized StoreFuture flushToStore()
{
+ Connection conn = null;
try
{
- if(_metaData != null)
+ if(!stored())
{
- Connection conn = newConnection();
+ conn = newConnection();
store(conn);
conn.commit();
- conn.close();
+ storedSizeChange(getMetaData().getContentSize());
}
}
catch (SQLException e)
@@ -2121,12 +2163,24 @@ public class DerbyMessageStore implements MessageStore
}
throw new RuntimeException(e);
}
+ finally
+ {
+ closeConnection(conn);
+ }
return StoreFuture.IMMEDIATE_FUTURE;
}
+ @Override
+ public void remove()
+ {
+ int delta = getMetaData().getContentSize();
+ DerbyMessageStore.this.removeMessage(_messageId);
+ storedSizeChange(-delta);
+ }
+
private synchronized void store(final Connection conn) throws SQLException
{
- if(_metaData != null)
+ if (!stored())
{
try
{
@@ -2139,18 +2193,17 @@ public class DerbyMessageStore implements MessageStore
_metaData = null;
_data = null;
}
- }
- if(_logger.isDebugEnabled())
- {
- _logger.debug("Storing message " + _messageId + " to store");
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Storing message " + _messageId + " to store");
+ }
}
}
- @Override
- public void remove()
+ private boolean stored()
{
- DerbyMessageStore.this.removeMessage(_messageId);
+ return _metaData == null || _isRecovered;
}
}
@@ -2446,4 +2499,181 @@ public class DerbyMessageStore implements MessageStore
}
return results;
}
+
+ private synchronized void storedSizeChange(final int delta)
+ {
+ if(getPersistentSizeHighThreshold() > 0)
+ {
+ synchronized(this)
+ {
+ // the delta supplied is an approximation of a store size change. we don;t want to check the statistic every
+ // time, so we do so only when there's been enough change that it is worth looking again. We do this by
+ // assuming the total size will change by less than twice the amount of the message data change.
+ long newSize = _totalStoreSize += 3*delta;
+
+ Connection conn = null;
+ try
+ {
+
+ if(!_limitBusted && newSize > getPersistentSizeHighThreshold())
+ {
+ conn = newAutoCommitConnection();
+ _totalStoreSize = getSizeOnDisk(conn);
+ if(_totalStoreSize > getPersistentSizeHighThreshold())
+ {
+ _limitBusted = true;
+ _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_OVERFULL);
+ }
+ }
+ else if(_limitBusted && newSize < getPersistentSizeLowThreshold())
+ {
+ long oldSize = _totalStoreSize;
+ conn = newAutoCommitConnection();
+ _totalStoreSize = getSizeOnDisk(conn);
+ if(oldSize <= _totalStoreSize)
+ {
+
+ reduceSizeOnDisk(conn);
+
+ _totalStoreSize = getSizeOnDisk(conn);
+ }
+
+ if(_totalStoreSize < getPersistentSizeLowThreshold())
+ {
+ _limitBusted = false;
+ _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL);
+ }
+
+
+ }
+ }
+ catch (SQLException e)
+ {
+ closeConnection(conn);
+ throw new RuntimeException("Exception will processing store size change", e);
+ }
+ }
+ }
+ }
+
+ private void reduceSizeOnDisk(Connection conn)
+ {
+ CallableStatement cs = null;
+ PreparedStatement stmt = null;
+ try
+ {
+ String tableQuery =
+ "SELECT S.SCHEMANAME, T.TABLENAME FROM SYS.SYSSCHEMAS S, SYS.SYSTABLES T WHERE S.SCHEMAID = T.SCHEMAID AND T.TABLETYPE='T'";
+ stmt = conn.prepareStatement(tableQuery);
+ ResultSet rs = null;
+
+ List<String> schemas = new ArrayList<String>();
+ List<String> tables = new ArrayList<String>();
+
+ try
+ {
+ rs = stmt.executeQuery();
+ while(rs.next())
+ {
+ schemas.add(rs.getString(1));
+ tables.add(rs.getString(2));
+ }
+ }
+ finally
+ {
+ if(rs != null)
+ {
+ rs.close();
+ }
+ }
+
+
+ cs = conn.prepareCall
+ ("CALL SYSCS_UTIL.SYSCS_COMPRESS_TABLE(?, ?, ?)");
+
+ for(int i = 0; i < schemas.size(); i++)
+ {
+ cs.setString(1, schemas.get(i));
+ cs.setString(2, tables.get(i));
+ cs.setShort(3, (short) 0);
+ cs.execute();
+ }
+ }
+ catch (SQLException e)
+ {
+ closeConnection(conn);
+ throw new RuntimeException("Error reducing on disk size", e);
+ }
+ finally
+ {
+ closePreparedStatement(stmt);
+ closePreparedStatement(cs);
+ }
+
+ }
+
+ private long getSizeOnDisk(Connection conn)
+ {
+ PreparedStatement stmt = null;
+ try
+ {
+ String sizeQuery = "SELECT SUM(T2.NUMALLOCATEDPAGES * T2.PAGESIZE) TOTALSIZE" +
+ " FROM " +
+ " SYS.SYSTABLES systabs," +
+ " TABLE (SYSCS_DIAG.SPACE_TABLE(systabs.tablename)) AS T2" +
+ " WHERE systabs.tabletype = 'T'";
+
+ stmt = conn.prepareStatement(sizeQuery);
+
+ ResultSet rs = null;
+ long size = 0l;
+
+ try
+ {
+ rs = stmt.executeQuery();
+ while(rs.next())
+ {
+ size = rs.getLong(1);
+ }
+ }
+ finally
+ {
+ if(rs != null)
+ {
+ rs.close();
+ }
+ }
+
+ return size;
+
+ }
+ catch (SQLException e)
+ {
+ closeConnection(conn);
+ throw new RuntimeException("Error establishing on disk size", e);
+ }
+ finally
+ {
+ closePreparedStatement(stmt);
+ }
+
+ }
+
+
+ private long getPersistentSizeLowThreshold()
+ {
+ return _persistentSizeLowThreshold;
+ }
+
+ private long getPersistentSizeHighThreshold()
+ {
+ return _persistentSizeHighThreshold;
+ }
+
+ @Override
+ public String getStoreType()
+ {
+ return DERBY_STORE_TYPE;
+ }
+
} \ No newline at end of file