diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java | 326 |
1 files changed, 278 insertions, 48 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java index 0371cdcfcb..154d7e6535 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java @@ -31,6 +31,7 @@ import java.lang.ref.SoftReference; import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.sql.Blob; +import java.sql.CallableStatement; import java.sql.Connection; import java.sql.Driver; import java.sql.DriverManager; @@ -59,6 +60,7 @@ import org.apache.qpid.server.federation.BrokerLink; import org.apache.qpid.server.message.EnqueableMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.store.ConfigurationRecoveryHandler; +import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler; import org.apache.qpid.server.store.ConfiguredObjectHelper; import org.apache.qpid.server.store.ConfiguredObjectRecord; import org.apache.qpid.server.store.Event; @@ -77,6 +79,9 @@ import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.Transaction; import org.apache.qpid.server.store.TransactionLogRecoveryHandler; import org.apache.qpid.server.store.TransactionLogResource; +import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecoveryHandler; +import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecoveryHandler; +import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHandler; /** * An implementation of a {@link MessageStore} that uses Apache Derby as the persistence @@ -111,6 +116,7 @@ public class DerbyMessageStore implements MessageStore private static Class<Driver> DRIVER_CLASS; + public static final String MEMORY_STORE_LOCATION = ":memory:"; private final AtomicLong _messageId = new AtomicLong(0); private AtomicBoolean _closed = new AtomicBoolean(false); @@ -230,10 +236,17 @@ public class DerbyMessageStore implements MessageStore private static final String DERBY_SINGLE_DB_SHUTDOWN_CODE = "08006"; + private static final String DERBY_STORE_TYPE = "DERBY"; + private final StateManager _stateManager; - + private final EventManager _eventManager = new EventManager(); + private long _totalStoreSize; + private boolean _limitBusted; + private long _persistentSizeLowThreshold; + private long _persistentSizeHighThreshold; + private MessageStoreRecoveryHandler _messageRecoveryHandler; private TransactionLogRecoveryHandler _tlogRecoveryHandler; @@ -253,7 +266,7 @@ public class DerbyMessageStore implements MessageStore ConfigurationRecoveryHandler configRecoveryHandler, Configuration storeConfiguration) throws Exception { - _stateManager.attainState(State.CONFIGURING); + _stateManager.attainState(State.INITIALISING); _configRecoveryHandler = configRecoveryHandler; commonConfiguration(name, storeConfiguration); @@ -269,13 +282,13 @@ public class DerbyMessageStore implements MessageStore _tlogRecoveryHandler = tlogRecoveryHandler; _messageRecoveryHandler = recoveryHandler; - _stateManager.attainState(State.CONFIGURED); + _stateManager.attainState(State.INITIALISED); } @Override public void activate() throws Exception { - _stateManager.attainState(State.RECOVERING); + _stateManager.attainState(State.ACTIVATING); // this recovers durable exchanges, queues, and bindings recoverConfiguration(_configRecoveryHandler); @@ -296,19 +309,39 @@ public class DerbyMessageStore implements MessageStore final String databasePath = storeConfiguration.getString(MessageStoreConstants.ENVIRONMENT_PATH_PROPERTY, System.getProperty("QPID_WORK") + File.separator + "derbyDB"); - File environmentPath = new File(databasePath); - if (!environmentPath.exists()) + if(!MEMORY_STORE_LOCATION.equals(databasePath)) { - if (!environmentPath.mkdirs()) + File environmentPath = new File(databasePath); + if (!environmentPath.exists()) { - throw new IllegalArgumentException("Environment path " + environmentPath + " could not be read or created. " - + "Ensure the path is correct and that the permissions are correct."); + if (!environmentPath.mkdirs()) + { + throw new IllegalArgumentException("Environment path " + environmentPath + " could not be read or created. " + + "Ensure the path is correct and that the permissions are correct."); + } } } _storeLocation = databasePath; + _persistentSizeHighThreshold = storeConfiguration.getLong(MessageStoreConstants.OVERFULL_SIZE_PROPERTY, -1l); + _persistentSizeLowThreshold = storeConfiguration.getLong(MessageStoreConstants.UNDERFULL_SIZE_PROPERTY, _persistentSizeHighThreshold); + if(_persistentSizeLowThreshold > _persistentSizeHighThreshold || _persistentSizeLowThreshold < 0l) + { + _persistentSizeLowThreshold = _persistentSizeHighThreshold; + } + createOrOpenDatabase(name, databasePath); + + Connection conn = newAutoCommitConnection();; + try + { + _totalStoreSize = getSizeOnDisk(conn); + } + finally + { + conn.close(); + } } private static synchronized void initialiseDriver() throws ClassNotFoundException @@ -322,7 +355,7 @@ public class DerbyMessageStore implements MessageStore private void createOrOpenDatabase(String name, final String environmentPath) throws SQLException { //FIXME this the _vhost name should not be added here, but derby wont use an empty directory as was possibly just created. - _connectionURL = "jdbc:derby:" + environmentPath + "/" + name + ";create=true"; + _connectionURL = "jdbc:derby" + (environmentPath.equals(MEMORY_STORE_LOCATION) ? environmentPath : ":" + environmentPath + "/") + name + ";create=true"; Connection conn = newAutoCommitConnection(); @@ -529,16 +562,17 @@ public class DerbyMessageStore implements MessageStore try { List<ConfiguredObjectRecord> configuredObjects = loadConfiguredObjects(); - ConfigurationRecoveryHandler.QueueRecoveryHandler qrh = recoveryHandler.begin(this); - _configuredObjectHelper.recoverQueues(qrh, configuredObjects); - ConfigurationRecoveryHandler.ExchangeRecoveryHandler erh = qrh.completeQueueRecovery(); + ExchangeRecoveryHandler erh = recoveryHandler.begin(this); _configuredObjectHelper.recoverExchanges(erh, configuredObjects); - ConfigurationRecoveryHandler.BindingRecoveryHandler brh = erh.completeExchangeRecovery(); + QueueRecoveryHandler qrh = erh.completeExchangeRecovery(); + _configuredObjectHelper.recoverQueues(qrh, configuredObjects); + + BindingRecoveryHandler brh = qrh.completeQueueRecovery(); _configuredObjectHelper.recoverBindings(brh, configuredObjects); - ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler lrh = brh.completeBindingRecovery(); + BrokerLinkRecoveryHandler lrh = brh.completeBindingRecovery(); recoverBrokerLinks(lrh); } catch (SQLException e) @@ -689,7 +723,7 @@ public class DerbyMessageStore implements MessageStore public void close() throws Exception { _closed.getAndSet(true); - _stateManager.stateTransition(State.ACTIVE, State.CLOSING); + _stateManager.attainState(State.CLOSING); try { @@ -710,7 +744,7 @@ public class DerbyMessageStore implements MessageStore } } - _stateManager.stateTransition(State.CLOSING, State.CLOSED); + _stateManager.attainState(State.CLOSED); } @Override @@ -956,8 +990,8 @@ public class DerbyMessageStore implements MessageStore try { - stmt.setLong(1, link.getId().getLeastSignificantBits()); - stmt.setLong(2, link.getId().getMostSignificantBits()); + stmt.setLong(1, link.getQMFId().getLeastSignificantBits()); + stmt.setLong(2, link.getQMFId().getMostSignificantBits()); ResultSet rs = stmt.executeQuery(); try { @@ -970,8 +1004,8 @@ public class DerbyMessageStore implements MessageStore try { - insertStmt.setLong(1, link.getId().getLeastSignificantBits()); - insertStmt.setLong(2, link.getId().getMostSignificantBits()); + insertStmt.setLong(1, link.getQMFId().getLeastSignificantBits()); + insertStmt.setLong(2, link.getQMFId().getMostSignificantBits()); insertStmt.setLong(3, link.getCreateTime()); byte[] argumentBytes = convertStringMapToBytes(link.getArguments()); @@ -1048,8 +1082,8 @@ public class DerbyMessageStore implements MessageStore { conn = newAutoCommitConnection(); stmt = conn.prepareStatement(DELETE_FROM_LINKS); - stmt.setLong(1, link.getId().getLeastSignificantBits()); - stmt.setLong(2, link.getId().getMostSignificantBits()); + stmt.setLong(1, link.getQMFId().getLeastSignificantBits()); + stmt.setLong(2, link.getQMFId().getMostSignificantBits()); int results = stmt.executeUpdate(); if (results == 0) @@ -1085,7 +1119,7 @@ public class DerbyMessageStore implements MessageStore try { - UUID id = bridge.getId(); + UUID id = bridge.getQMFId(); stmt.setLong(1, id.getLeastSignificantBits()); stmt.setLong(2, id.getMostSignificantBits()); ResultSet rs = stmt.executeQuery(); @@ -1105,7 +1139,7 @@ public class DerbyMessageStore implements MessageStore insertStmt.setLong(3, bridge.getCreateTime()); - UUID linkId = bridge.getLink().getId(); + UUID linkId = bridge.getLink().getQMFId(); insertStmt.setLong(4, linkId.getLeastSignificantBits()); insertStmt.setLong(5, linkId.getMostSignificantBits()); @@ -1151,8 +1185,8 @@ public class DerbyMessageStore implements MessageStore { conn = newAutoCommitConnection(); stmt = conn.prepareStatement(DELETE_FROM_BRIDGES); - stmt.setLong(1, bridge.getId().getLeastSignificantBits()); - stmt.setLong(2, bridge.getId().getMostSignificantBits()); + stmt.setLong(1, bridge.getQMFId().getLeastSignificantBits()); + stmt.setLong(2, bridge.getQMFId().getMostSignificantBits()); int results = stmt.executeUpdate(); if (results == 0) @@ -1541,7 +1575,7 @@ public class DerbyMessageStore implements MessageStore buf = buf.slice(); MessageMetaDataType type = MessageMetaDataType.values()[dataAsBytes[0]]; StorableMessageMetaData metaData = type.getFactory().createMetaData(buf); - StoredDerbyMessage message = new StoredDerbyMessage(messageId, metaData, false); + StoredDerbyMessage message = new StoredDerbyMessage(messageId, metaData, true); messageHandler.message(message); } @@ -1921,6 +1955,7 @@ public class DerbyMessageStore implements MessageStore private class DerbyTransaction implements Transaction { private final ConnectionWrapper _connWrapper; + private int _storeSizeIncrease; private DerbyTransaction() @@ -1938,18 +1973,19 @@ public class DerbyMessageStore implements MessageStore @Override public void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException { - if(message.getStoredMessage() instanceof StoredDerbyMessage) + final StoredMessage storedMessage = message.getStoredMessage(); + if(storedMessage instanceof StoredDerbyMessage) { try { - ((StoredDerbyMessage)message.getStoredMessage()).store(_connWrapper.getConnection()); + ((StoredDerbyMessage) storedMessage).store(_connWrapper.getConnection()); } catch (SQLException e) { throw new AMQStoreException("Exception on enqueuing message " + _messageId, e); } } - + _storeSizeIncrease += storedMessage.getMetaData().getContentSize(); DerbyMessageStore.this.enqueueMessage(_connWrapper, queue, message.getMessageNumber()); } @@ -1964,12 +2000,15 @@ public class DerbyMessageStore implements MessageStore public void commitTran() throws AMQStoreException { DerbyMessageStore.this.commitTran(_connWrapper); + storedSizeChange(_storeSizeIncrease); } @Override public StoreFuture commitTranAsync() throws AMQStoreException { - return DerbyMessageStore.this.commitTranAsync(_connWrapper); + final StoreFuture storeFuture = DerbyMessageStore.this.commitTranAsync(_connWrapper); + storedSizeChange(_storeSizeIncrease); + return storeFuture; } @Override @@ -1998,6 +2037,8 @@ public class DerbyMessageStore implements MessageStore { private final long _messageId; + private final boolean _isRecovered; + private StorableMessageMetaData _metaData; private volatile SoftReference<StorableMessageMetaData> _metaDataRef; private byte[] _data; @@ -2006,21 +2047,21 @@ public class DerbyMessageStore implements MessageStore StoredDerbyMessage(long messageId, StorableMessageMetaData metaData) { - this(messageId, metaData, true); + this(messageId, metaData, false); } StoredDerbyMessage(long messageId, - StorableMessageMetaData metaData, boolean persist) + StorableMessageMetaData metaData, boolean isRecovered) { _messageId = messageId; + _isRecovered = isRecovered; - - _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData); - if(persist) + if(!_isRecovered) { _metaData = metaData; } + _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData); } @Override @@ -2101,16 +2142,17 @@ public class DerbyMessageStore implements MessageStore @Override public synchronized StoreFuture flushToStore() { + Connection conn = null; try { - if(_metaData != null) + if(!stored()) { - Connection conn = newConnection(); + conn = newConnection(); store(conn); conn.commit(); - conn.close(); + storedSizeChange(getMetaData().getContentSize()); } } catch (SQLException e) @@ -2121,12 +2163,24 @@ public class DerbyMessageStore implements MessageStore } throw new RuntimeException(e); } + finally + { + closeConnection(conn); + } return StoreFuture.IMMEDIATE_FUTURE; } + @Override + public void remove() + { + int delta = getMetaData().getContentSize(); + DerbyMessageStore.this.removeMessage(_messageId); + storedSizeChange(-delta); + } + private synchronized void store(final Connection conn) throws SQLException { - if(_metaData != null) + if (!stored()) { try { @@ -2139,18 +2193,17 @@ public class DerbyMessageStore implements MessageStore _metaData = null; _data = null; } - } - if(_logger.isDebugEnabled()) - { - _logger.debug("Storing message " + _messageId + " to store"); + if(_logger.isDebugEnabled()) + { + _logger.debug("Storing message " + _messageId + " to store"); + } } } - @Override - public void remove() + private boolean stored() { - DerbyMessageStore.this.removeMessage(_messageId); + return _metaData == null || _isRecovered; } } @@ -2446,4 +2499,181 @@ public class DerbyMessageStore implements MessageStore } return results; } + + private synchronized void storedSizeChange(final int delta) + { + if(getPersistentSizeHighThreshold() > 0) + { + synchronized(this) + { + // the delta supplied is an approximation of a store size change. we don;t want to check the statistic every + // time, so we do so only when there's been enough change that it is worth looking again. We do this by + // assuming the total size will change by less than twice the amount of the message data change. + long newSize = _totalStoreSize += 3*delta; + + Connection conn = null; + try + { + + if(!_limitBusted && newSize > getPersistentSizeHighThreshold()) + { + conn = newAutoCommitConnection(); + _totalStoreSize = getSizeOnDisk(conn); + if(_totalStoreSize > getPersistentSizeHighThreshold()) + { + _limitBusted = true; + _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_OVERFULL); + } + } + else if(_limitBusted && newSize < getPersistentSizeLowThreshold()) + { + long oldSize = _totalStoreSize; + conn = newAutoCommitConnection(); + _totalStoreSize = getSizeOnDisk(conn); + if(oldSize <= _totalStoreSize) + { + + reduceSizeOnDisk(conn); + + _totalStoreSize = getSizeOnDisk(conn); + } + + if(_totalStoreSize < getPersistentSizeLowThreshold()) + { + _limitBusted = false; + _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL); + } + + + } + } + catch (SQLException e) + { + closeConnection(conn); + throw new RuntimeException("Exception will processing store size change", e); + } + } + } + } + + private void reduceSizeOnDisk(Connection conn) + { + CallableStatement cs = null; + PreparedStatement stmt = null; + try + { + String tableQuery = + "SELECT S.SCHEMANAME, T.TABLENAME FROM SYS.SYSSCHEMAS S, SYS.SYSTABLES T WHERE S.SCHEMAID = T.SCHEMAID AND T.TABLETYPE='T'"; + stmt = conn.prepareStatement(tableQuery); + ResultSet rs = null; + + List<String> schemas = new ArrayList<String>(); + List<String> tables = new ArrayList<String>(); + + try + { + rs = stmt.executeQuery(); + while(rs.next()) + { + schemas.add(rs.getString(1)); + tables.add(rs.getString(2)); + } + } + finally + { + if(rs != null) + { + rs.close(); + } + } + + + cs = conn.prepareCall + ("CALL SYSCS_UTIL.SYSCS_COMPRESS_TABLE(?, ?, ?)"); + + for(int i = 0; i < schemas.size(); i++) + { + cs.setString(1, schemas.get(i)); + cs.setString(2, tables.get(i)); + cs.setShort(3, (short) 0); + cs.execute(); + } + } + catch (SQLException e) + { + closeConnection(conn); + throw new RuntimeException("Error reducing on disk size", e); + } + finally + { + closePreparedStatement(stmt); + closePreparedStatement(cs); + } + + } + + private long getSizeOnDisk(Connection conn) + { + PreparedStatement stmt = null; + try + { + String sizeQuery = "SELECT SUM(T2.NUMALLOCATEDPAGES * T2.PAGESIZE) TOTALSIZE" + + " FROM " + + " SYS.SYSTABLES systabs," + + " TABLE (SYSCS_DIAG.SPACE_TABLE(systabs.tablename)) AS T2" + + " WHERE systabs.tabletype = 'T'"; + + stmt = conn.prepareStatement(sizeQuery); + + ResultSet rs = null; + long size = 0l; + + try + { + rs = stmt.executeQuery(); + while(rs.next()) + { + size = rs.getLong(1); + } + } + finally + { + if(rs != null) + { + rs.close(); + } + } + + return size; + + } + catch (SQLException e) + { + closeConnection(conn); + throw new RuntimeException("Error establishing on disk size", e); + } + finally + { + closePreparedStatement(stmt); + } + + } + + + private long getPersistentSizeLowThreshold() + { + return _persistentSizeLowThreshold; + } + + private long getPersistentSizeHighThreshold() + { + return _persistentSizeHighThreshold; + } + + @Override + public String getStoreType() + { + return DERBY_STORE_TYPE; + } + }
\ No newline at end of file |